2015-01-05 21 views
5

spark SQLクエリに使用される2つのスパークRDD、dataRDDおよびnewPairDataRDDがあります。 私のアプリケーションが起動すると、dataRDDが初期化されます。 1つの指定されたhbaseエンティティ内のすべてのデータがdataRDDに格納されます。スパークRDDユニオンでは非常に遅い

クライアントのSQLクエリが来たら、私のAPPは新しい更新をすべて取得し、newPairDataRDDに挿入します。 dataRDD共用体newPairDataRDDおよびspark SQL文の表として登録します。

dataRDDに0レコード、newPairDataRDDに1レコードが新たに挿入されました。組合には4秒かかります。あまりにも遅いです

私はそれが妥当ではないと思います。それをもっと速くする方法は誰でも知っていますか?おかげ 簡単なコードスパークのWeb UIから

// Step1: load all data from hbase to dataRDD when initial, this only run once. 
    JavaPairRDD<String, Row> dataRDD= getAllBaseDataToJavaRDD(); 
    dataRDD.cache(); 
    dataRDD.persist(StorageLevel.MEMORY_ONLY()); 
    logger.info(dataRDD.count()); 

    // Step2: when spark sql query coming, load latest updated and inserted data from db to newPairDataRDD 

    JavaPairRDD<String, Row> newPairDataRDD = getUpdateOrInstertBaseDataToJavaRDD(); 
    // Step3: if count>0 do union and reduce 

     if(newPairDataRDD.count() > 0) { 

     JavaPairRDD<String, Row> unionedRDD =dataRDD.union(newPairDataRDD); 

    // if data was updated in DB, need to delete the old version from the dataRDD. 

     dataRDD = unionedRDD.reduceByKey(
      new Function2<Row, Row, Row>() { 
      // @Override 
      public Row call(Row r1, Row r2) { 
      return r2; 
      } 
      }); 
    } 
//step4: register the dataRDD 
JavaSchemaRDD schemaRDD = sqlContext.applySchema(dataRDD..values(), schema); 

//step5: execute sql query 
retRDD = sqlContext.sql(sql); 
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect(); 

以下のように、私は以下を参照することができます。シャッフル読むシャッフル

6を書く成功/総入力SparkPlan.scalaに集まる:85の+詳細1/4どうやらそれは組合

完成した段階のための4S(8)

StageId説明提出期間のタスクが必要/ 2015年8時17分2秒8日 - 8月156.0 B

7組合SparkSqlQueryForMarsNew.java:389+details 2015年1月4日8時17分4秒8日 - 8月64.0 B 156.0 B

答えて

1

より効率的にあなたが望むものを達成する方法はを使用することですとflatMapValues()の場合は、dataRDDに新しいパーティションを追加することを除いて、ユニオンを使用することはほとんどありません。つまり、reduceByKey()の前にすべてのデータをシャッフルする必要があります。 cogroup()flatMapValues()は、newPairDataRDDの再パーティション化を引き起こします。

JavaPairRDD<String, Tuple2<List<Row>, List<Row>>> unionedRDD = dataRDD.cogroup(newPairDataRDD); 
JavaPairRDD<String, Row> updated = unionedRDD.flatMapValues(
    new Function<Tuple2<List<Row>, List<Row>>, Iterable<Row>>() { 
     public Iterable<Row> call(Tuple2<List<Row>, List<Row>> grouped) { 
      if (grouped._2.nonEmpty()) { 
       return grouped._2; 
      } else { 
       return grouped._1; 
      } 
     } 
    }); 

またはスカラ

val unioned = dataRDD.cogroup(newPairDataRDD) 
val updated = unioned.flatMapValues { case (oldVals, newVals) => 
    if (newVals.nonEmpty) newVals else oldVals 
} 

免責事項は、私はJavaで火花を書くことに慣れていません!上記の内容が間違っていると、誰かが私を修正してください!

0

あなたRDDSを再分割してみてください。

JavaPairRDD unionedRDD = dataRDD.repartition(sc.defaultParallelism * 3).union(newPairDataRDD.repartition(sc.defaultParallelism * 3));