2017-12-17 17 views
0

セッションのPCollectionを取得し、チャネル/接続ごとの平均セッション時間を取得しようとしています。私は、初期のトリガーが生成された各ウィンドウに対して発火するようなことをしています.60分のウィンドウが1分ごとにスライドすると、初期のトリガーは60回発火します。アウトプットのタイムスタンプを見ると、毎分60分のウィンドウがあります。私は最後のウィンドウのためにトリガーを1回発射したいので、最後の60分間の平均セッション時間は10秒です。Apache Beam - 複数のウィンドウを出力するウィンドウをスライドさせる

以前はスライディングウィンドウを使用していましたが、期待した結果が得られました。スライドとセッションのウィンドウを混在させることで、何とかこれを引き起こしています。

私はあなたに私のパイプラインの絵を描くみましょう:

まず、私は、アクティブなユーザーに基づいてセッションを作成しています:

.apply("Add Window Sessions", 
Window.<KV<String, String>> into(Sessions.withGapDuration(Duration.standardMinutes(60))) 
    .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_ALWAYS) 
    .triggering(
    AfterWatermark.pastEndOfWindow() 
     .withEarlyFirings(AfterProcessingTime 
     .pastFirstElementInPane() 
     .plusDelayOf(Duration.standardSeconds(10)))) 
    .withAllowedLateness(Duration.ZERO) 
    .discardingFiredPanes() 
) 
.apply("Group Sessions", Latest.perKey()) 

ステップ後にこのセッションオブジェクト、計算セッション期間を作成、などこれはPCollection(Session)で終わります。

私はPcollection(セッション)からKV接続を作成します。

次に、スライディングウィンドウを適用してから平均を適用します。

.apply("Apply Rolling Minute Window", 
     Window. < KV < String, Integer >> into(
     SlidingWindows 
     .of(Duration.standardMinutes(60)) 
     .every(Duration.standardMinutes(1))) 
     .triggering(
     Repeatedly.forever(
     AfterWatermark.pastEndOfWindow() 
     .withEarlyFirings(AfterProcessingTime 
     .pastFirstElementInPane() 
     .plusDelayOf(Duration.standardSeconds(10))) 
     ) 
    ) 
     .withAllowedLateness(Duration.standardMinutes(1)) 
     .discardingFiredPanes() 
    ) 
    .apply("Get Average", Mean.perKey()) 

これは私が問題を見ている時点です。私が見たいと思うのは、キー当たりの平均出力時間が1つの出力です。私が実際に目にしているのは、毎分60分の同じキーに対する60の出力です。

EARLY 2017-12-17T20:41:59.999Z 
EARLY 2017-12-17T20:43:59.999Z 
EARLY 2017-12-17T20:56:59.999Z 
(cont) 

ログは12月で印刷された:私は60分未来にこの出力にタイムスタンプを持つ60回を取得

LOG.info(c.pane().getTiming() + " " + c.timestamp()); 

:CはProcessContextいるとDoFnでこのログでは

17,2018 19:35:19 出力数は常にウィンドウサイズ/スライドの持続時間です。ですから、5分ごとに60分のウィンドウを作成すれば、12個の出力が得られます。

答えて

0

私はこれを理解したと思います。

スライディングウィンドウは、.every()関数を使用して新しいウィンドウを作成します。早期発火の設定は各ウィンドウに適用されるので、複数回の発火は理にかなっています。

私の使用例に合わせて、現在のウィンドウのみを出力するには、結果を出力して周波数を制御する.every()を調整する前に、c.pane()、isFirst()== trueをチェックします。 。

関連する問題