Pub/Subから読み込んだデータフロー(Beam SDK 2.0.0)を作成しています。ウィンドウ内の要素を数え、その数をtimeseriesとしてBigTableに保存します。ウィンドウは1分の長さに固定されています。Cloud Dataflowウィンドウのトリガーが閉じたウィンドウの値を上書きする
私の意図は、現在の時間ウィンドウでリアルタイム更新を得るために、トリガを使用して毎秒現在のウィンドウの値を更新することでした。
しかし、それは動作していないようです。値は毎秒正しく更新されますが、データフローが次の分に作業を開始すると、最初の値がゼロに更新されます。だから、基本的に私の最後の値は正しいです、残りはすべてゼロです。
Pipeline pipeline = Pipeline.create(options);
PCollection<String> live = pipeline
.apply("Read from PubSub", PubsubIO.readStrings()
.fromSubscription("projects/..."))
.apply("Window per minute",
Window
.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
.triggering(Repeatedly
.forever(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(1)))
.orFinally(AfterWatermark.pastEndOfWindow()))
.accumulatingFiredPanes()
.withAllowedLateness(Duration.ZERO)
);
トリガーコードで試してみましたが、何も役立たないです。私の唯一の選択肢は、.trigger
ブロック全体を削除することです。誰も同じような行動を経験していますか?