2016-03-18 5 views
0

私はSpark Streamingアプリケーションを使用して、カフカブローカーからのイベントを分析しています。私は以下のようなルールを持っているし、新しいルールは既存のものを組み合わせることによって生成することができます。スパークストリーミング再生

If this event type occurs raise an alert. 
If this event type occurs more than 3 times in a 5-minute interval, raise an alert. 

並行して、私はカサンドラにすべての受信データを保存します。私がしたいことは、カッサンドラの歴史的なデータのためにこのストリーミングアプリを実行することです。たとえば、

<This rule> would have generated <these> alerts for <last week>. 

スパークでこれを行う方法はありますか、それともロードマップにありますか?たとえば、Apache Flinkにはイベント時間処理があります。しかし、既存のコードベースをそれに移行するのは難しいようで、私は既存のコードを再利用することでこの問題を解決したいと思います。

答えて

0

これはかなり簡潔で、いくつかの注意点があります。まず、これがカフカ側からどのように作用するかを理解するのに役立ちます。

カフカは、オフセットと呼ばれるものを管理します。カフカの各メッセージには、パーティション内の位置に対するオフセットがあります。パーティションの最初のメッセージは、オフセットが0Lであり、2つ目が1Lなどです。それ以外は、ログのロールオーバーとトピック圧縮のため、0Lは必ずしも最も古いオフセットではありません。パーティション。

最初に行う必要があるのは、最初から読み込みたいすべてのパーティションのオフセットを収集することです。ここでは、これを行う機能があります:

def getOffsets(consumer: SimpleConsumer, topic: String, partition: Int) : (Long,Long) = { 
    val time = kafka.api.OffsetRequest.LatestTime 
    val reqInfo = Map[TopicAndPartition,PartitionOffsetRequestInfo](
    (new TopicAndPartition(topic, partition)) -> (new PartitionOffsetRequestInfo(time, 1000)) 
) 
    val req = new kafka.javaapi.OffsetRequest(
    reqInfo, kafka.api.OffsetRequest.CurrentVersion, "test" 
) 
    val resp = consumer.getOffsetsBefore(req) 
    val offsets = resp.offsets(topic, partition) 
    (offsets(offsets.size - 1), offsets(0)) 
} 

あなたはこのようにそれを呼び出します。あなたはカフカからのオフセットを取得について知りたいことすべて、read thisについては

val (firstOffset,nextOffset) = getOffsets(consumer, "MyTopicName", 0) 

。最小限に抑えるためにはわかりません。 (あなたが完全例えばPartitionOffsetRequestInfoの二番目の引数を、理解したときに私に教えてください。)

今、あなたはあなたが歴史的に見てみたいパーティションのfirstOffsetlastOffsetを持っていることを、あなたはそれからである、createDirectStreamfromOffsetパラメータを使用しますタイプ:fromOffset: Map[TopicAndPartition, Long]getOffsets()から得たfirstOffsetLong /値を設定します。

nextOffsetについては、これを使用して、履歴データの処理から新しいデータへの移行時にストリーム内で判断できます。 msg.offset == nextOffsetの場合、パーティション内の最初の非履歴レコードを処理しています。注意事項については、今

、直接from the documentation:コンテキストが開始された後

  • 、新たなストリーミング計算は を設定したり、それに追加することができます。
  • コンテキストが停止すると、 を再起動できません。
  • 同じ時刻に のJVMでアクティブにできるStreamingContextは1つだけです。
  • StreamingContextのstop()もSparkContextを停止します。 にStreamingContextのみを停止するには、stopSparkContextというオプションのstop() のパラメータをfalseに設定します。
  • 次のStreamingContextが作成される前に、 StreamingContextが(SparkContextを停止せずに)停止されている限り、SparkContextを に再作成して複数のStreamingContextを作成することができます。

それはので、私はfirstOffsetと同時にnextOffsetをつかむこれらの警告のだ - 私は、ストリームのアップを維持するが、それに歴史的提示時間処理からコンテキストを変更することができます。

+0

詳細な回答ありがとうございます。私はそれを読んで、それをよく理解するためにそれを再び読むでしょう。一方、Sparkの「スライディングウィンドウ」を使用して時間をカウントします。どうすればこれを達成できますか? – yolgun

関連する問題