spark SQLクエリに使用される2つのスパークRDD、dataRDDおよびnewPairDataRDDがあります。 私のアプリケーションが起動すると、dataRDDが初期化されます。 1つの指定されたhbaseエンティティ内のすべてのデータがdataRDDに格納されます。スパークRDDユニオンでは非常に遅い
クライアントのSQLクエリが来たら、私のAPPは新しい更新をすべて取得し、newPairDataRDDに挿入します。 dataRDD共用体newPairDataRDDおよびspark SQL文の表として登録します。
dataRDDに0レコード、newPairDataRDDに1レコードが新たに挿入されました。組合には4秒かかります。あまりにも遅いです
私はそれが妥当ではないと思います。それをもっと速くする方法は誰でも知っていますか?おかげ 簡単なコードスパークのWeb UIから
// Step1: load all data from hbase to dataRDD when initial, this only run once.
JavaPairRDD<String, Row> dataRDD= getAllBaseDataToJavaRDD();
dataRDD.cache();
dataRDD.persist(StorageLevel.MEMORY_ONLY());
logger.info(dataRDD.count());
// Step2: when spark sql query coming, load latest updated and inserted data from db to newPairDataRDD
JavaPairRDD<String, Row> newPairDataRDD = getUpdateOrInstertBaseDataToJavaRDD();
// Step3: if count>0 do union and reduce
if(newPairDataRDD.count() > 0) {
JavaPairRDD<String, Row> unionedRDD =dataRDD.union(newPairDataRDD);
// if data was updated in DB, need to delete the old version from the dataRDD.
dataRDD = unionedRDD.reduceByKey(
new Function2<Row, Row, Row>() {
// @Override
public Row call(Row r1, Row r2) {
return r2;
}
});
}
//step4: register the dataRDD
JavaSchemaRDD schemaRDD = sqlContext.applySchema(dataRDD..values(), schema);
//step5: execute sql query
retRDD = sqlContext.sql(sql);
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect();
以下のように、私は以下を参照することができます。シャッフル読むシャッフル
6を書く成功/総入力SparkPlan.scalaに集まる:85の+詳細1/4どうやらそれは組合
完成した段階のための4S(8)
StageId説明提出期間のタスクが必要/ 2015年8時17分2秒8日 - 8月156.0 B
7組合SparkSqlQueryForMarsNew.java:389+details 2015年1月4日8時17分4秒8日 - 8月64.0 B 156.0 B