2016-04-19 4 views
0

スパークストリーミングのアウトオブボックス結合機能は、実際の使用の多くのケースを保証するものではありません。その理由は、マイクロバッチRDDに含まれるデータだけを結合するからです。SPARK - 2つのデータストリームの結合 - キャッシュのメンテナンス

2つのカフカストリームのデータを結合し、stream1の各オブジェクトをstream2の対応するオブジェクトでスパークさせて豊かにし、それをHBaseに保存します。

実装

  • のようにオブジェクトを追加または交換、ストリーム2のオブジェクトからメモリ内のデータセットを維持し、それらはストリーム1内のすべての要素に対して

  • を受け取っている場合、検索しキャッシュにアクセスします一致した場合はHBaseに保存し、一致しない場合はカフカストリームに戻します。

この質問は、スパークストリーミングの調査と上記の実装方法を見つけるAPIです。

+0

質問は? – maasg

+0

最後の行にクエリーを入れてください。それが今あなたにとって意味をなさないかどうか確認してください。 – user3840810

答えて

0

:基本的にはあなたが何かを埋める「累計」RDDを保ちます。これはupdateStateByKeyの方が効率的です。これらはタイプKのいくつかのキーで識別されているので、ストリーム2タイプVのあなたのオブジェクトを想定し、PairDStreamFunction上で定義されている、あなたの最初のポイントは次のように行くだろう:

def stream2: DStream[(K, V)] = ??? 

def maintainStream2Objects(key: K, value: Option[V], state: State[V]): (K, V) = { 
    value.foreach(state.update(_)) 
    (key, state.get()) 
} 

val spec = StateSpec.function(maintainStream2Objects) 

val stream2State = stream2.mapWithState(spec) 

stream2Stateは今、各バッチが(K, V)を含むストリームであります各キーの最新の値とペアにします。このストリームで結合を行い、stream1を使用して、2番目のポイントのさらなるロジックを実行することができます。

+0

それでは、例えば24時間などの後に、古いデータをどのように「期限切れにする」のですか? –

1

RDDを他のRDDに参加させることができます。マイクロバッチ内のものだけでなく、RDDも参加できます。良いスタートがmapWithStateに見えることであろう

var globalRDD1: RDD[...] = sc.emptyRDD 
var globalRDD2: RDD[...] = sc.emptyRDD 

dstream1.foreachRDD(rdd => if (!rdd.isEmpty) globalRDD1 = globalRDD1.union(rdd)) 
dstream2.foreachRDD(rdd => if (!rdd.isEmpty) { 
    globalRDD2 = globalRDD2.union(rdd)) 
    globalRDD1.join(globalRDD2).foreach(...) // etc, etc 
} 
+0

お返事ありがとうございます。あなたが言及したことは、 "キャッシュ" RDDを最新の状態に保つでしょう。しかし、毎回RDDを継続して放送することは費用がかかるだろう。それについての考えは?別の方法がありますか? – user3840810

+0

ブロードキャストはどういう意味ですか?私のコードには「放送」はありません。 –

+0

別の単語を使用していて、「共有変数」のようにブロードキャストを意味しませんでした。 globalRDD1.join(globalRDD2)を毎回実行すると、ドライバからすべてのデータをエグゼキュータに送信します。以前のマイクロバッチで以前に送信されたデータであっても、すでに処理されている可能性があります。再度あなたの応答に感謝します。 – user3840810

関連する問題