2016-04-24 7 views
0

大規模なテキストファイルを保存しようとしました。 5ギガバイト大きいファイルを保存するとframeLimitを超えます

sc.parallelize(cfile.toString() 
    .split("\n"), 1) 
    .saveAsTextFile(new Path(path+".cs", "data").toUri.toString) 

が、私は私が今の年齢のためにここにこだわって

java.io.IOException: Broken pipe 
    at sun.nio.ch.FileDispatcherImpl.write0(Native Method) 
    at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) 
    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) 
    at sun.nio.ch.IOUtil.write(IOUtil.java:65) 
... 
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 6 
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:542) 
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:538) 
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) 

を得続けます。誰でもここで私を助けて、私がテキストファイルとしてcfileをどのように保存できるか説明できますか?


スタンドアロン/ローカル/ヤーンクラスタ?

  • 糸クラスタ

メモリ/コア設定?パーティションの

  • 1,8 TB
  • 285個のコア

数?

パーティションの数を設定するためのコードの懸念行:

val model = word2vec 
    .setMinCount(minCount.asInstanceOf[Int]) 
    .setVectorSize(arguments.getVectorSize) 
    .setWindowSize(arguments.getContextWindowSize) 
    .setNumPartitions(numW2vPartitions) 
    .setLearningRate(learningRate) 
    .setNumIterations(arguments.getNumIterations) 
    .fit(wordSequence) 

引数を火花提出私は現在1にパーティションの数を設定しています

  • spark-submit --master yarn 
          --deploy-mode cluster 
          --driver-memory 20G 
          --num-executors 5 
          --executor-cores 8 
          --driver-java-options "-Dspark.akka.frameSize=2000" 
          --executor-memory 20G --class 
    

答えて

0

スタンドアロン/ローカル/ヤーンクラスター? メモリ/コア設定? パーティションの数?

あなたのエラーは、おそらく私は、なぜあなたはこれをやっているかわからない

(OOMキラーはそれを殺したかもしれないか、それはいくつかのOOMのエラーを得た)労働者のいずれかがなくなっていることを症状:cfile.toString()。 split( "\ n") - これから私は5GBのコンテンツをすべてメモリに保持し、それを並列化しようとしていると思いますか?明らかに最適ではありません。 関連する可能性があるもう1つの問題 - ドライバが何とか5GBのメモリをすべて保持できる場合でも、ドライバーワーカー間のすべてのネットワークレイヤはこの量のデータを気に入らないため、パーティションに分割するようアドバイスします。

代わりに、sc.textFile(..)でファイルを読み取り、新しいパスに保存することができます。 sc.textFile(..)。repartition(100)を使用して、テキストファイルのパーティション数を制御することもできます。

+0

実際に私は単語ベクトルモデルを保存しようとしています。[スパークバージョン](https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark)です。 /mllib/feature/Word2Vec.scala#L625)の 'save()'メソッドは単純に機能しません。理由はわかりません。私はユーザーグループに連絡しましたが、それ以降は答えはありませんでした。これが私のモデルを単にCSVファイルとして保存して、一見壊れた実装を回避することができたと思った理由です。私はあなたの要求された情報で私の質問を更新します。 – displayname

+0

ドライバプログラムは実際には20GBのRAMを搭載していますが、現在使用している8人の作業者ごとに同じです。 – displayname

+0

それでは、どのようにモデルをcfileに変換しましたか?私は、モデルがある種のrddだと仮定し、テキストとして保存したいので、このrddを文字列のrddに変換することができます(おそらくドライバにすべてを渡すことなく)。そしてhdfsに保存しますまた、parrallelで –

関連する問題