RDDに次の関数をマップして、各要素のstart、length、およびidから渡します。 kとkmersは、RDDのすべての項目で同じ値です。私は間違いなくアクセスする必要がRDDでマップして適用する方法変換中に変換するのを避ける
def getGapSequence(start: Int, length: Int, id: String, k: Int, kmers: RDD[((String, Int), String)]): String ={
var tempStart = start
var totalGap = ""
do{
val tempKmer = kmers.apply((id, start))
if(tempKmer != ""){
totalGap += tempKmer
tempStart += k
}else{
totalGap += 'N'
tempStart += 1
}
}while(totalGap.length < length)
totalGap.take(length)
}
:コードされ
org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations;
次のとおりです。問題は、私は私がマッピングしています。この機能でRDD.applyを呼んでいるということですので、私はこのエラーを持っていますそのキーでkmersの項目は、そうでなければ、私は追加する文字列を知りませんので。私はまた、異なるデータ型(すなわち地図、アレイ)にkmersを変換しようとしたが、私が働いているデータのサイズがとても巨大であるので、私は
org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 102:102 was 250174590 bytes, which exceeds max allowed: spark.akka.frameSize (16777216 bytes)
を取得します。私はデータを分割することはできません。私は上記の情報が必要です。これをどのようにして達成することができますか?フレームサイズを大きくする必要はありませんか?ありがとうございました。
私のプログラムは、私の他の関数のいくつかがSparkContext(私のmainメソッドで宣言されている)を渡さずにRDDを回る小さなデータセットでうまく動作します。これはこの機能でも機能しませんか?また、これは "複数のSparkContext"問題を引き起こすので、私は提案2を行うことができません。あなたの最初の提案をさらに説明できますか?私のメソッドはすべて一つのオブジェクトに含まれています。ありがとうございました! – Alex