2017-12-09 3 views
2

データセットをキャッシュし、そのデータセットで「N」個のクエリを並列に実行していくつかのメトリックを計算する必要があります。フィルタが変更され、これらのクエリを並行して実行する必要があります。これは、応答時間が重要で、キャッシュするデータセットのサイズが常に1 GB未満になるためです。Sparkを使用してデータセットをキャッシュしてクエリを並列に実行する

私はSparkでデータセットをキャッシュし、その後それをクエリする方法を知っていますが、同じデータセットでクエリを並行して実行する必要がある場合、どうすれば同じ結果が得られますか? alluxioの導入は一つの方法ですが、Sparkの世界でこれを達成できる方法はありますか?

たとえばJavaでは、データをメモリにキャッシュしてから、マルチスレッドを使用して同じことを達成できますが、Sparkでどのように行うのですか?

+0

?あなたは最初に試してから、助けを求めるだけです –

+0

私は質問で言及したように、私はデータセットをキャッシュしてその上でクエリを実行する方法を知っています、私はスパークでは、私はアプローチ/コンセプトを使用することを知っている、私はすでに –

+0

を行っていただろう:クエリは、分散データセットとクエリが連続して実行される並列実行することができます。複数のクエリを並列に実行したい場合は、スレッドの概念を使用する必要があります。 :) –

答えて

2

Scalaの並列コレクションを使用してSparkのドライバコードで並列クエリを実行するのは非常に簡単です。ここでは、このように見える可能性がどのように最小限の例:

val dfSrc = Seq(("Raphael",34)).toDF("name","age").cache() 


// define your queries, instead of returning a dataframe you could also write to a table etc 
val query1: (DataFrame) => DataFrame = (df:DataFrame) => df.select("name") 
val query2: (DataFrame) => DataFrame = (df:DataFrame) => df.select("age") 

// Fire queries in parallel 
import scala.collection.parallel.ParSeq 
ParSeq(query1,query2).foreach(query => query(dfSrc).show()) 

EDIT:

クエリIDを収集し、あなたがそうするべきマップをもたらすために:あなたがこれまでに試してみました何

val resultMap = ParSeq(
(1,query1), 
(2,query2) 
).map{case (queryId,query) => (queryId,query(dfSrc))}.toMap 
+0

素晴らしい、ありがとう、これは何を探しているのか分からなかったそれらのことについて勉強します。 –

+0

クエリが常に出力として単一の値を返す場合、どのようにしてキーがquery_idを表し、valueがクエリ出力(単一値)を表すMapにすべてのクエリの出力を集めることができるか考えてみてください。 –

+0

@RajivChodisetti foreach、例えばtoMap' –

関連する問題