私は、Googleのパブ/サブから読み込まれ、Googleのクラウドストレージにデータを書き込みますデータフローパイプラインを書いている:ウィンドウを出力する前に、無制限のApache Beamパイプラインのウィンドウ内のすべての要素にどのように変換を適用しますか?
pipeline.apply(marketData)
.apply(ParDo.of(new PubsubMessageToByteArray()))
.apply(ParDo.of(new ByteArrayToString()))
.apply(ParDo.of(new StringToMarketData()))
.apply(ParDo.of(new AddTimestamps()))
.apply(Window.<MarketData>into(FixedWindows.of(Duration.standardMinutes(options.getMinutesPerWindow())))
.withAllowedLateness(Duration.standardSeconds(options.getAllowedSecondLateness()))
.accumulatingFiredPanes())
.apply(ParDo.of(new MarketDataToCsv()))
.apply("Write File(s)", TextIO
.write()
.to(options.getOutputDirectory())
.withWindowedWrites()
.withNumShards(1)
.withFilenamePolicy(new WindowedFilenamePolicy(outputBaseDirectory))
.withHeader(csvHeader));
pipeline.run().waitUntilFinish();
私は要素を重複排除し、その結果を出力する前に、ウィンドウ内の要素をソートしたいです。これは一般的なPTransformとは異なり、ウィンドウが終了するとトランスフォームを実行する必要があります。
1人のワーカーが失敗した場合に複数のワーカーが同じメッセージを生成しているため、Pub/Subトピックに重複があります。書き込む前にウィンドウ内の重複をすべて削除するにはどうすればよいですか? RemoveDuplicatesクラスはBeamバージョン0.2に存在していましたが、現在のバージョンには存在しませんでした。
私は、フードの下では、ビームがPTトランスフォームを作業者間で並列化することを理解します。しかし、このパイプラインはwithNumShards(1)
と書いているので、1人のワーカーしか最終的な結果を書くことはありません。理論的には、そのワーカーに書面化の前に重複排除変換を適用させることが可能であることを意味します。
Beam python sdk still has a RemoveDuplicates methodだから、私はそのロジックをJavaで再現できますが、より良い方法がない限り、なぜそれは削除されていますか?私は実装がいくつかのウィンドウトリガの後に実行された重複排除ParDoであると思います。
編集:GroupByKeyとSortValuesは、私が必要とするように見えます。私は今それらを使用しようとしています。ここで