2016-05-10 10 views
4

RDBMSからHDFSにデータを転送する必要がある場合に取り組んでいます。 sqoopを使用してこのケースのベンチマークを行い、6〜7分で約20GBのデータを転送できることが分かりました。RDBMSからhdfsにデータを転送する際のApache Spark-SQLとSqoopのベンチマーク

Spark SQLで同じことをしようとすると、パフォーマンスは非常に低くなります(レコードの1GbがNetezzaからhdfsに転送するのに4分かかる)。チューニングを行い、パフォーマンスを向上させようとしていますが、1平方インチ(約3Gbのデータで約1分)のレベルにチューニングする可能性は低いです。

sparkは主に処理エンジンですが、私の主な質問は、sparkとsqoopの両方が内部的にJDBCドライバを使用していることです。なぜ性能に大きな差があるのでしょうか? 。私はここに私のコードを掲載しています。

object helloWorld { 
    def main(args: Array[String]): Unit = { 
    val conf = new SparkConf().setAppName("Netezza_Connection").setMaster("local") 
    val sc= new SparkContext(conf) 
    val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) 
    sqlContext.read.format("jdbc").option("url","jdbc:netezza://hostname:port/dbname").option("dbtable","POC_TEST").option("user","user").option("password","password").option("driver","org.netezza.Driver").option("numPartitions","14").option("lowerBound","0").option("upperBound","13").option("partitionColumn", "id").option("fetchSize","100000").load().registerTempTable("POC") 
    val df2 =sqlContext.sql("select * from POC") 
    val partitioner= new org.apache.spark.HashPartitioner(14) 
    val rdd=df2.rdd.map(x=>(String.valueOf(x.get(1)),x)).partitionBy(partitioner).values 
    rdd.saveAsTextFile("hdfs://Hostname/test") 
    } 
} 

私は他の多くの記事をチェックしているが、sqoopの内部作業とチューニングのための明確な答えを得ることができませんでしたも、私はこの問題を理解する上で.KindlyヘルプをベンチマークスパークSQL対sqoopました。

答えて

2

あなたは仕事のための間違ったツールを使用しています。

Sqoopは、データベース(num-mapperを参照)にそれぞれ接続する一連のプロセス(データノード上)を起動し、それぞれデータセットの一部を抽出します。私は、あなたがスパークと並行して読みやすさを達成できるとは思わない。

データセットをSqoopで取得し、Sparkで処理します。

2

次試すことができます - 任意のパーティションなしで100万に増加したfetch_sizeとネティーザから

  1. 読み出したデータを。

    sqlContext.read.format("jdbc").option("url","jdbc:netezza://hostname:port/dbname").option("dbtable","POC_TEST").option("user","user").option("password","password").option("driver","org.netezza.Driver").option("fetchSize","1000000").load().registerTempTable("POC") 
    
  2. 最終ファイルに書き込む前にデータを再分割します。

    val df3 = df2.repartition(10) //to reduce the shuffle 
    
  3. ORCフォーマットはTEXTよりも最適化されています。 parquet/ORCに最終出力を書き出します。

    df3.write.format("ORC").save("hdfs://Hostname/test") 
    
0

@amitabh 答えとしてマークされていますが、私はそれに同意しません。

jdbcから読み込み中にデータを分割する述部を与えると、sparkはパーティションごとに別々のタスクを実行します。あなたのケースでは、タスクは14(あなたは火花UIを使用してこれを確認することができます)でなければなりません。

私は、ローカルでマスターを使用していることに気付きました。これはエグゼキュータのコアが1つしかないためです。したがって、並列性はありません。あなたの場合は何が起こっているのですか?

これで、sqoopと同じスループットを得るには、これらのタスクが並行して実行されていることを確認する必要があります。理論的に、これはいずれかによって行うことができる。典型的には14個のコア(スペクトルの他端)

と1つのエグゼキュータを使用して1つのコアと14件のエグゼキュータ各 2を用い 1、Iはエグゼキュータ当たり4~5のコアとなるだろう。そこで、15/5 = 3のエグゼキュータでパフォーマンスをテストします(クラスタモードで動作するドライバの場合は1コアを1から14に追加しました)。 設定:sparkConf.setのexecutor.cores、executor.instancesを使用してconfigsを再生します。

これでパフォーマンスが大幅に向上しない場合は、次に実行プログラムのメモリを確認します。

最後に、アプリケーションロジックを調整してmapRDDサイズ、パーティションサイズ、シャッフルサイズを確認しました。

+0

私を助けた: - あなたのコメントをありがとう、私はここに私の会社の糸のURLを投稿することができませんでしたbecoz ..私は私のコードでは、「ローカル」としてマスターを与えています。..実際に私は糸のクラスターでこれを実行していました。また、14の並列性は、hdfsにデータを書き込んでいる間ではなく、読み込み中ではなく、達成されています。読み込み中に、全体の処理が非常に遅くなっているSQLデータベースから読み取るスレッドは1つだけです。この場合、私はマルコポーロの答えが正しいと思います。これは私の見解です。私が何か不足している場合に備えて私を修正してください。ありがとう。 –

+0

ジョブに割り当てられているエグゼクティブはいくつですか?あなたは火花のUIを使用して確認できますか? – bigdatamann

0

以下のソリューションは、


var df=spark.read.format("jdbc").option("url"," "url").option("user","user").option("password","password").option("dbTable","dbTable").option("fetchSize","10000").load() df.registerTempTable("tempTable") var dfRepart=spark.sql("select * from tempTable distribute by primary_key") //this will repartition the data evenly

dfRepart.write.format("parquet").save("hdfs_location")

関連する問題