例えば、Apache Sparkを使用してRAMに文字列を格納することが可能かどうかを知りたいと思います。実際、Apache Sparkが扱っている新しい入力データに応じて、これらの文字列を照会して更新したいと考えています。さらに可能であれば、ノードは他のすべてのノードにどの文字列が格納されているかを通知できますか? プロジェクターに関する情報が必要な場合は、お気軽にお問い合わせください。Apache SparkでRAMにデータを保存できますか?
J
例えば、Apache Sparkを使用してRAMに文字列を格納することが可能かどうかを知りたいと思います。実際、Apache Sparkが扱っている新しい入力データに応じて、これらの文字列を照会して更新したいと考えています。さらに可能であれば、ノードは他のすべてのノードにどの文字列が格納されているかを通知できますか? プロジェクターに関する情報が必要な場合は、お気軽にお問い合わせください。Apache SparkでRAMにデータを保存できますか?
J
はい、あなたは、ステートフルなストリーミング機能mapWithState
が必要です。この機能を使用すると、ストリーミングバッチ全体でメモリにキャッシュされた状態を更新できます。
まだチェックポイントを設定していない場合は、チェックポイントを有効にする必要があります。
Scalaの使用例:
def stateUpdateFunction(userId: UserId,
newData: UserAction,
stateData: State[UserSession]): UserModel = {
val currentSession = stateData.get() // Get current session data
val updatedSession = ... // Compute updated session using newData
stateData.update(updatedSession) // Update session data
val userModel = ... // Compute model using updatedSession
return userModel // Send model downstream
}
// Stream of user actions, keyed by the user ID
val userActions = ... // stream of key-value tuples of (UserId, UserAction)
// Stream of data to commit
val userModels = userActions.mapWithState(StateSpec.function(stateUpdateFunction))
のJavaの使用例:
// Update the cumulative count function
Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
new Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> call(String word, Optional<Integer> one,
State<Integer> state) {
int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
Tuple2<String, Integer> output = new Tuple2<>(word, sum);
state.update(sum);
return output;
}
};
// DStream made of get cumulative counts that get updated in every batch
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
wordsDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD));
ありがとうImDarrenG、Javaに例があるかどうか知っていますか? –
私はJavaの例を含めるように答えを更新しました – ImDarrenG
ありがとうImDarrenG、私は完全にmapWithMapの使い方を理解しています。しかし、私は質問があります。私はこのjson行{"device": "dv1"、 "parameter1": "vv1"、 "parameter2": "vv2"}を持っているとします。 次の例では、jsonを解析するコードを使用して、時間dv1が表示されます。表示される結果は、たとえば(dv1、51)です。 出力に入るために結果をjson行に含めたいとします: {"device": "dv1"、 "parameter1": "vv1"、 "parameter2": "vv2"、 "increment" :51} これをどのように達成できるか考えていますか?前のコードでどうやって作るのか分かりません。 –
質問には、Sparkはデータベースではありません...かなり広く、エンジンまたは他のストレージであれ、計算エンジンです。データグリッド(Apache Ignite、Hazelcast、Oracle Coherenceなど)を考慮すると、データを更新するオプションを備えたRAM内ストレージです。スパークはデータをロードし、処理し、ハイブテーブルに保存することもできます。現在のところ、ユースケースに関する情報はあまりありません –
Sparkはデータベースではありません。実際には、私はRAMに以前に格納された情報を使って将来のデータに対して数学的演算を行うために、現在処理されているデータに関するいくつかの情報を保存したいと思います... Hiveテーブルに関しては、各ノードに独立したテーブル、またはすべてのノードに共通のテーブルですか?それはどこに保管されていますか? –
ワークフローがある場合:load - > transform - > transform - > etc - >結果を書き込むと、データをDataFrameに読み込み、RAMにキャッシュしてから計算を実行できます。あなたはデータを読み込むオプションがあります - 私がSpark 2.0から知っている限り - それを更新してください –