2016-07-11 4 views
-1

RDDに次の関数をマップして、各要素のstart、length、およびidから渡します。 kとkmersは、RDDのすべての項目で同じ値です。私は間違いなくアクセスする必要がRDDでマップして適用する方法変換中に変換するのを避ける

def getGapSequence(start: Int, length: Int, id: String, k: Int, kmers: RDD[((String, Int), String)]): String ={ 
    var tempStart = start 
    var totalGap = "" 
    do{ 
    val tempKmer = kmers.apply((id, start)) 
    if(tempKmer != ""){ 
     totalGap += tempKmer 
     tempStart += k 
     }else{ 
     totalGap += 'N' 
     tempStart += 1 
     } 
    }while(totalGap.length < length) 

    totalGap.take(length) 
} 

:コードされ

org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations;

次のとおりです。問題は、私は私がマッピングしています。この機能でRDD.applyを呼んでいるということですので、私はこのエラーを持っていますそのキーでkmersの項目は、そうでなければ、私は追加する文字列を知りませんので。私はまた、異なるデータ型(すなわち地図、アレイ)にkmersを変換しようとしたが、私が働いているデータのサイズがとても巨大であるので、私は

org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 102:102 was 250174590 bytes, which exceeds max allowed: spark.akka.frameSize (16777216 bytes)

を取得します。私はデータを分割することはできません。私は上記の情報が必要です。これをどのようにして達成することができますか?フレームサイズを大きくする必要はありませんか?ありがとうございました。

答えて

0

パラメータとしてRDDを呼び出すことが正しく機能しません。あなたはSparkContextのためにメインのアプリケーションコードであなたのRDDを呼び出す必要があります。

このRDDには、SparkContextのリファレンスがありません。それを解決するには2つの方法があります。

  1. このコードをメインアプリで実行します。これはうまくいくでしょう。
  2. SparkContextをグローバル変数に変更し、パラメータ内でRDDを設定せず、このRDDを関数内に設定します。パラメータは、必要なファイルへのパスでなければなりません。
+0

私のプログラムは、私の他の関数のいくつかがSparkContext(私のmainメソッドで宣言されている)を渡さずにRDDを回る小さなデータセットでうまく動作します。これはこの機能でも機能しませんか?また、これは "複数のSparkContext"問題を引き起こすので、私は提案2を行うことができません。あなたの最初の提案をさらに説明できますか?私のメソッドはすべて一つのオブジェクトに含まれています。ありがとうございました! – Alex

関連する問題