2016-09-29 14 views
6

私はS3からデータセットをロードする必要があるアプリケーションを構築しています。機能は正常に動作していますが、パフォーマンスは驚くほど遅いです。S3からのCSVファイルをSparkデータフレームに読み込むのが遅いと予想されていますか?

データセットはCSV形式です。各ファイルには約7Mのレコード(行)があり、各ファイルは600〜700MBです。

val spark = SparkSession 
     .builder() 
     .appName("MyApp") 
     .getOrCreate() 

val df = spark 
    .read 
    .option("header", "true") 
    .option("inferSchema", "true") 
    .csv(inFileName:_*) 
    // inFileName is a list that current contains 2 file names 
    // eg. s3://mybucket/myfile1.csv 

val r = df.rdd.flatMap{ row => 
     /* 
     * Discard poorly formated input records 
     */ 
     try { 
     totalRecords.add(1) 

     // this extracts several columns from the dataset 
     // each tuple of indexColProc specifies the index of the column to 
     // select from the input row, and a function to convert 
     // the value to an Int 
     val coords = indexColProc.map{ case (idx, func) => func(row.get(idx).toString) } 

     List((coords(0), coords)) 
     } 
     catch { 
     case e: Exception => {  
      badRecords.add(1) 
      List() 
     } 
     } 
    } 

println("Done, row count " + r.count) 

これは、5台のマシンのAWSクラスタで、それぞれm3.xlargeで実行しました。 maximizeResourceAllocationパラメーターはtrueに設定され、これはクラスター上で実行されている唯一のアプリケーションでした。

私はアプリケーションを2回実行しました。 S3のファイルを指す 'inFileName'と、hadoopファイルシステムのファイルのローカルコピーを指しているのは、初めてです。

私はSparkヒストリサーバーを見て、最後のr.countアクションに対応するジョブにドリルダウンすると、s3上のファイルにアクセスするのに2.5分かかります。 hdfs。私は、「私は、ローカル設定=小さいクラスタ上またはマスターで同じ実験を実行したときに比例し、同様の結果を得まし。

私は

aws s3 cp <file> 

を使用してクラスタにS3のファイルをコピーするとそれだけで6.5sかかります

s3にアクセスするとこのようなパフォーマンスが低下しますか?そうでない場合、このようなパフォーマンスは低下しますか?そうでない場合は、誰かが私が間違っている場所を指摘してもらえますか?それが予想される場合は、パフォーマンスを向上させるためにこれを行う他の方法がありますか?アプリケーションを実行する前にs3からhdfsにファイルを単純にコピーするようなものを開発しますか?

+0

'flatMap'を実行する前に' df.cache() 'を試してみてください/ – maxymoo

+0

私はそれを試しましたが、効果がなかったか、パイプラインがハングアップしました(私は試したときに別のマシンインスタンスでしたその実験)。 –

答えて

6

さらに掘り下げた後、S3ネイティブを使用すると大きな違いが生じることがわかりました。私はURIプレフィックスをs3n://に変更しました。問題のジョブのパフォーマンスは2.5分から21秒に下がりました。だから、s3とhdfsのアクセスには3秒のペナルティしかありません。これはかなり妥当です。

このトピックを検索すると、s3nには最大ファイルサイズの上限が5GBであることが記載されています。しかし、thisには、最大ファイルサイズの制限がHadoop 2.4.0で5TBに増加したというメッセージがありました。

"S3ブロックファイルシステムの使用は推奨されなくなりました。"

0

spark-csvパッケージを試しましたか? csvを読むための最適化がたくさんあり、mode = MALFORMEDを使ってフィルタリングしようとしている不良行を削除することができます。

csv_rdf<- read.df(sqlContext,"s3n://xxxxx:[email protected]/file1.csv",source="com.databricks.spark.csv") 

もっと私たちが問題だったので、我々のデータは1TBとした以外は、数ヶ月ほど前に、まったく同じ問題に直面して、ここでhttps://github.com/databricks/spark-csv

+0

私は過去にspark-csvを試しましたが、パフォーマンスに違いはありませんでした。しかし、mode = MALFORMEDのポインタに感謝します。私はそのモードについて知らなかったし、私のコードを簡素化するのに役立つかもしれない。ありがとう –

0

詳細を見つけることができます:あなたは、直接このようなS3から読み取ることができますもっと発音される。 ステージがスケジュールされるたびに、それぞれ30人のエグゼキュータを持つ5つのインスタンスが存在していたので(S3でデータを取得する作業が最初に実行される)、そのようになりました。これらのタスクはネットワーク帯域幅でボトルネックになり、それらはすべてタスクの一部を計算するために移動し、同時にCPUを争う可能性があります。

基本的に、タスクはすべて同時に同じことをしているので、常に同じリソースに対して競合しています。

kタスクの数を任意の時点で許可すると、すぐにダウンロードが完了し、計算部分に移動し、kタスクの次のセットが入ってダウンロードが開始されることがわかりました。このように、今のところk(すべてではなく)のタスクは完全な帯域幅を得ており、いくつかのタスクはいくつかの共通リソースでお互いに待たずにCPUまたはI/O上で有用なことを同時に実行しています。

これが役に立ちます。

+0

試してみる価値のある音。あなたはタスク実行のそのような細かい制御の種類を実装する方法のいくつかの指針を提供できますか?私の場合、執行者はわずか8人で、それぞれに約4つのコアがあります。現在、私はm3.xlargeで実験を行っていますが、このI/Oボトルネックを乗り越えることができれば、より多くの物理的なコアを持つインスタンスに移行することができます。 –

+0

私は公に利用可能なコードを持っていませんが、私たちが本質的にしたのは、ネットワーク集中型コールに入る前に各タスク内で、同じブロック内に他のタスクがどれくらいあるかを把握し、いくつかのkより小さい場合は、このタスクに忙しいスピンを1秒間実行して再試行するように依頼することができます。そして、ネットワークコールでタスクが完了すると、セントラルサービスを更新します。確かに最もエレガントなものではありませんが、90分から50分までの作業時間を短縮できました。 –

+0

もう少し掘り下げた後、私はS3ネイティブを使うと大きな違いがあることを発見しました。私はURIプレフィックスをs3n://に変更しました。問題のジョブのパフォーマンスは2.5分から21秒に下がりました。だから、s3とhdfsのアクセスには3秒のペナルティしかありません。これはかなり妥当です。あなたの問題にこれが適用されるかどうかはわかりませんが、s3ネイティブを使用しているときのファイルサイズの上限は5GBです。 –

関連する問題