2016-08-29 2 views
2

私はScalaを使ってApache Flinkを使い始めました。誰かが私が持っている現在のデータストリームから遅れたストリーム(kイベントまたはk単位の時間差がある)を作成する方法を教えてもらえますか?Apache Flink:遅れたDatastreamを作成する

基本的には、データストリームに自動回帰モデル(ストリームのリニア回帰とタイムラグのあるバージョン)を実装したいと考えています。したがって、以下の擬似コードに類似した方法が必要です。

val ds : DataStream = ... 

val laggedDS : DataStream = ds.map(lag _) 

def lag(ds : DataStream, k : Time) : DataStream = { 

} 

すべてのイベントが1秒間隔で間隔があり、2秒遅れがある場合は、サンプル入力と出力がこのようになると思います。

入力:1、2、3、4、5、6、7 ...
出力:私はあなたのことを考えるとNA、NA、1、2、3、4、5 ...

+0

質問を延長し、遅延ストリームの意味を説明できますか?ありがとうございます –

+1

@FabianHueske、私は遅れたdatastreamと思う、彼はいつもより遅くdatasteamの要素を得ることを意味します。例えば、1分の遅れは、ストリームに到着したときよりも1分遅れて要素を放出する。 –

+1

質問は「kイベントに遅れている」と言っていますが、「x分遅れていません」。 1つの解釈は、k個のイベントのFIFO待ち行列に新しいイベントを追加し、新しいイベントが到着したときに待ち行列ヘッド要素を転送する。所望のセマンティクスを明確に定義しなければ、その質問に答えることはできません。 –

答えて

4

要件を正しく実装するには、FIFOキューを使用してFlatMapFunctionとして実装します。キューバッファーはkのイベントをバッファーし、新しいイベントが到着するたびにヘッドを放出します。フォールトトレラントなストリーミングアプリケーションが必要な場合は、キューを状態として登録する必要があります。 Flinkは、状態(つまりキュー)をチェックポイントし、障害が発生した場合にそれを復元します。

FlatMapFunctionは、次のようになります。時間差で

class Lagger(val k: Int) 
    extends FlatMapFunction[X, X] 
    with Checkpointed[mutable.Queue[X]] 
{ 

    var fifo: mutable.Queue[X] = new mutable.Queue[X]() 

    override def flatMap(value: X, out: Collector[X]): Unit = { 
    // add new element to queue 
    fifo.enqueue(value) 
    if (fifo.size == k + 1) { 
     // remove head element and emit 
     out.collect(fifo.dequeue()) 
    } 
    } 

    // restore state 
    override def restoreState(state: mutable.Queue[X]) = { fifo = state } 

    // get state to checkpoint 
    override def snapshotState(cId: Long, cTS: Long): mutable.Queue[X] = fifo 

} 

返す要素が複雑です。新しい要素が到着したときにだけ関数が呼び出されるため、これには放出のためのタイマスレッドが必要です。

+0

ありがとう!これは素晴らしい。 :) – Kauchy

関連する問題