0

Pub/Subから読み込んだデータフロー(Beam SDK 2.0.0)を作成しています。ウィンドウ内の要素を数え、その数をtimeseriesとしてBigTableに保存します。ウィンドウは1分の長さに固定されています。Cloud Dataflowウィンドウのトリガーが閉じたウィンドウの値を上書きする

私の意図は、現在の時間ウィンドウでリアルタイム更新を得るために、トリガを使用して毎秒現在のウィンドウの値を更新することでした。

しかし、それは動作していないようです。値は毎秒正しく更新されますが、データフローが次の分に作業を開始すると、最初の値がゼロに更新されます。だから、基本的に私の最後の値は正しいです、残りはすべてゼロです。

Pipeline pipeline = Pipeline.create(options); 

PCollection<String> live = pipeline 
     .apply("Read from PubSub", PubsubIO.readStrings() 
     .fromSubscription("projects/...")) 
     .apply("Window per minute", 
      Window 
       .<String>into(FixedWindows.of(Duration.standardMinutes(1))) 
       .triggering(Repeatedly 
        .forever(AfterProcessingTime 
         .pastFirstElementInPane() 
         .plusDelayOf(Duration.standardSeconds(1)))           
        .orFinally(AfterWatermark.pastEndOfWindow())) 
       .accumulatingFiredPanes() 
       .withAllowedLateness(Duration.ZERO) 
      ); 

トリガーコードで試してみましたが、何も役立たないです。私の唯一の選択肢は、.triggerブロック全体を削除することです。誰も同じような行動を経験していますか?

答えて

0

私の問題をGoogleに報告した後、Beam SDKの問題を発見しました。これらのリンクについての詳細:一緒に

EOWとGCタイマーの火(ゼロ以外許さ遅刻)私たちは、それが最終ペインであることに注意することはできない。彼らがでてくる場合https://issues.apache.org/jira/browse/BEAM-2505

処理時間のタイマーが適切に無視されていませんGCタイマー:https://issues.apache.org/jira/browse/BEAM-2502

処理時間のタイマーがちょうど完全に間違って別のタイムドメインからのタイムスタンプを比較し、GCタイマーとして解釈されます。https://issues.apache.org/jira/browse/BEAM-2504

関連する問題