2016-11-24 3 views
0

例えば、Apache Sparkを使用してRAMに文字列を格納することが可能かどうかを知りたいと思います。実際、Apache Sparkが扱っている新しい入力データに応じて、これらの文字列を照会して更新したいと考えています。さらに可能であれば、ノードは他のすべてのノードにどの文字列が格納されているかを通知できますか? プロジェクターに関する情報が必要な場合は、お気軽にお問い合わせください。Apache SparkでRAMにデータを保存できますか?

J

+0

質問には、Sparkはデータベースではありません...かなり広く、エンジンまたは他のストレージであれ、計算エンジンです。データグリッド(Apache Ignite、Hazelcast、Oracle Coherenceなど)を考慮すると、データを更新するオプションを備えたRAM内ストレージです。スパークはデータをロードし、処理し、ハイブテーブルに保存することもできます。現在のところ、ユースケースに関する情報はあまりありません –

+0

Sparkはデータベースではありません。実際には、私はRAMに以前に格納された情報を使って将来のデータに対して数学的演算を行うために、現在処理されているデータに関するいくつかの情報を保存したいと思います... Hiveテーブルに関しては、各ノードに独立したテーブル、またはすべてのノードに共通のテーブルですか?それはどこに保管されていますか? –

+0

ワークフローがある場合:load - > transform - > transform - > etc - >結果を書き込むと、データをDataFrameに読み込み、RAMにキャッシュしてから計算を実行できます。あなたはデータを読み込むオプションがあります - 私がSpark 2.0から知っている限り - それを更新してください –

答えて

1

はい、あなたは、ステートフルなストリーミング機能mapWithStateが必要です。この機能を使用すると、ストリーミングバッチ全体でメモリにキャッシュされた状態を更新できます。

まだチェックポイントを設定していない場合は、チェックポイントを有効にする必要があります。

Scalaの使用例:

def stateUpdateFunction(userId: UserId, 
         newData: UserAction, 
         stateData: State[UserSession]): UserModel = { 
    val currentSession = stateData.get() // Get current session data 
    val updatedSession = ...   // Compute updated session using newData 
    stateData.update(updatedSession)   // Update session data  
    val userModel = ...     // Compute model using updatedSession 
    return userModel    // Send model downstream 
} 

// Stream of user actions, keyed by the user ID 
val userActions = ... // stream of key-value tuples of (UserId, UserAction) 
// Stream of data to commit 
val userModels = userActions.mapWithState(StateSpec.function(stateUpdateFunction)) 

https://databricks.com/blog/2016/02/01/faster-stateful-stream-processing-in-apache-spark-streaming.html

のJavaの使用例:

// Update the cumulative count function 
Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc = 
    new Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() { 
     @Override 
     public Tuple2<String, Integer> call(String word, Optional<Integer> one, 
      State<Integer> state) { 
     int sum = one.orElse(0) + (state.exists() ? state.get() : 0); 
     Tuple2<String, Integer> output = new Tuple2<>(word, sum); 
     state.update(sum); 
     return output; 
     } 
    }; 

// DStream made of get cumulative counts that get updated in every batch 
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream = 
    wordsDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD)); 

https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java行90:

+0

ありがとうImDarrenG、Javaに例があるかどうか知っていますか? –

+1

私はJavaの例を含めるように答えを更新しました – ImDarrenG

+0

ありがとうImDarrenG、私は完全にmapWithMapの使い方を理解しています。しかし、私は質問があります。私はこのjson行{"device": "dv1"、 "parameter1": "vv1"、 "parameter2": "vv2"}を持っているとします。 次の例では、jsonを解析するコードを使用して、時間dv1が表示されます。表示される結果は、たとえば(dv1、51)です。 出力に入るために結果をjson行に含めたいとします: {"device": "dv1"、 "parameter1": "vv1"、 "parameter2": "vv2"、 "increment" :51} これをどのように達成できるか考えていますか?前のコードでどうやって作るのか分かりません。 –

関連する問題