2017-12-13 3 views
0

私は、Googleのパブ/サブから読み込まれ、Googleのクラウドストレージにデータを書き込みますデータフローパイプラインを書いている:ウィンドウを出力する前に、無制限のApache Beamパイプラインのウィンドウ内のすべての要素にどのように変換を適用しますか?

pipeline.apply(marketData) 
     .apply(ParDo.of(new PubsubMessageToByteArray())) 
     .apply(ParDo.of(new ByteArrayToString())) 
     .apply(ParDo.of(new StringToMarketData())) 
     .apply(ParDo.of(new AddTimestamps())) 
     .apply(Window.<MarketData>into(FixedWindows.of(Duration.standardMinutes(options.getMinutesPerWindow()))) 
       .withAllowedLateness(Duration.standardSeconds(options.getAllowedSecondLateness())) 
       .accumulatingFiredPanes()) 
     .apply(ParDo.of(new MarketDataToCsv())) 
     .apply("Write File(s)", TextIO 
       .write() 
       .to(options.getOutputDirectory()) 
       .withWindowedWrites() 
       .withNumShards(1) 
       .withFilenamePolicy(new WindowedFilenamePolicy(outputBaseDirectory)) 
       .withHeader(csvHeader)); 

    pipeline.run().waitUntilFinish(); 

私は要素を重複排除し、その結果を出力する前に、ウィンドウ内の要素をソートしたいです。これは一般的なPTransformとは異なり、ウィンドウが終了するとトランスフォームを実行する必要があります。

1人のワーカーが失敗した場合に複数のワーカーが同じメッセージを生成しているため、Pub/Subトピックに重複があります。書き込む前にウィンドウ内の重複をすべて削除するにはどうすればよいですか? RemoveDuplicatesクラスはBeamバージョン0.2に存在していましたが、現在のバージョンには存在しませんでした。

私は、フードの下では、ビームがPTトランスフォームを作業者間で並列化することを理解します。しかし、このパイプラインはwithNumShards(1)と書いているので、1人のワーカーしか最終的な結果を書くことはありません。理論的には、そのワーカーに書面化の前に重複排除変換を適用させることが可能であることを意味します。

Beam python sdk still has a RemoveDuplicates methodだから、私はそのロジックをJavaで再現できますが、より良い方法がない限り、なぜそれは削除されていますか?私は実装がいくつかのウィンドウトリガの後に実行された重複排除ParDoであると思います。

編集:GroupByKeySortValuesは、私が必要とするように見えます。私は今それらを使用しようとしています。ここで

答えて

0

は、重複排除の一部のための答えです:

.apply(Distinct 
// MarketData::key produces a String. Use withRepresentativeValue() because Apache beam deserializes Java objects into bytes, which could cause two equal objects to be interpreted as not equal. See org/apache/beam/sdk/transforms/Distinct.java for details. 
.withRepresentativeValueFn(MarketData::key) 
.withRepresentativeType(TypeDescriptor.of(String.class))) 

そして、ここでは、(並べ替えも必要とされている場合)の要素をソートし、重複排除のためのソリューションです:

public static class DedupAndSortByTime extends Combine.CombineFn<MarketData, TreeSet<MarketData>, List<MarketData>> { 
    @Override 
    public TreeSet<MarketData> createAccumulator() { 
     return new TreeSet<>(Comparator 
       .comparingLong(MarketData::getEventTime) 
       .thenComparing(MarketData::getOrderbookType)); 
    } 

    @Override 
    public TreeSet<MarketData> addInput(TreeSet<MarketData> accum, MarketData input) { 
     accum.add(input); 
     return accum; 
    } 

    @Override 
    public TreeSet<MarketData> mergeAccumulators(Iterable<TreeSet<MarketData>> accums) { 

     TreeSet<MarketData> merged = createAccumulator(); 
     for (TreeSet<MarketData> accum : accums) { 
      merged.addAll(accum); 
     } 
     return merged; 
    } 

    @Override 
    public List<MarketData> extractOutput(TreeSet<MarketData> accum) { 
     return Lists.newArrayList(accum.iterator()); 
    } 
} 

ので、更新パイプラインは

です
// Pipeline 
    pipeline.apply(marketData) 
     .apply(ParDo.of(new MarketDataDoFns.PubsubMessageToByteArray())) 
     .apply(ParDo.of(new MarketDataDoFns.ByteArrayToString())) 
     .apply(ParDo.of(new MarketDataDoFns.StringToMarketDataAggregate())) 
     .apply(ParDo.of(new MarketDataDoFns.DenormalizeMarketDataAggregate())) 
     .apply(ParDo.of(new MarketDataDoFns.AddTimestamps())) 
     .apply(Window.<MarketData>into(FixedWindows.of(Duration.standardMinutes(options.getMinutesPerWindow()))) 
       .withAllowedLateness(Duration.standardSeconds(options.getAllowedSecondLateness())) 
       .accumulatingFiredPanes()) 
     .apply(Combine.globally(new MarketDataCombineFn.DedupAndSortByTime()).withoutDefaults()) 
     .apply(ParDo.of(new MarketDataDoFns.MarketDataToCsv())) 
     .apply("Write File(s)", TextIO 
       .write() 
       // This doesn't set the output directory as expected. "/output" gets stripped and I don't know why, 
       // so "/output" has to be added to the directory path within the FilenamePolicy. 
       .to(options.getOutputDirectory()) 
       .withWindowedWrites() 
       .withNumShards(1) 
       .withFilenamePolicy(new MarketDataFilenamePolicy.WindowedFilenamePolicy(outputBaseDirectory)) 
       .withHeader(csvHeader)); 

    pipeline.run().waitUntilFinish(); 
関連する問題