2017-12-14 6 views
0

私は現在、リアルタイムデータを処理するためにGoogle DataflowでApache Beamを使用しています。データは無制限のGoogle PubSubから取得されます。そのため、現在はストリーミングパイプラインを使用しています。しかし、ストリーミングパイプラインを24時間稼働させるのは非常に高価です。コストを削減するために、一定の時間間隔(たとえば30分ごと)で実行するバッチパイプラインに切り替えることを検討しています。なぜなら、処理がユーザーにとってリアルタイムであることは本当に重要ではないからです。Apache Beam:無制限ソースを持つバッチパイプライン

バインドされたソースとしてPubSubサブスクリプションを使用することができるのだろうか?私の考えは、ジョブが実行されるたびに、トリガーする前に1分間データを蓄積するということです。今のところそれは可能ではないようですが、私はBoundedReadFromUnboundedSourceというクラス(私はどのように使用するのか分かりません)に出くわしました。私は次の手順を実行しようとしましたが、仕事はまだストリーミングモードで実行

PCollection<MyData> data = pipeline 
      .apply("ReadData", PubsubIO 
        .readMessagesWithAttributes() 
        .fromSubscription(options.getInput())) 
      .apply("ParseData", ParDo.of(new ParseMyDataFn())) 
      .apply("Window", Window 
        .<MyData>into(new GlobalWindows()) 
        .triggering(Repeatedly 
          .forever(AfterProcessingTime 
            .pastFirstElementInPane() 
            .plusDelayOf(Duration.standardSeconds(5)) 
          ) 
        ) 
        .withAllowedLateness(Duration.ZERO).discardingFiredPanes() 
      ); 

PCollection<MyData> data = pipeline 
      .apply("ReadData", PubsubIO 
        .readMessagesWithAttributes() 
        .fromSubscription(options.getInput())) 
      .apply("ParseData", ParDo.of(new ParseMyDataFn())) 

      // Is there a way to make the window trigger once and turning it into a bounded source? 
      .apply("Window", Window 
        .<MyData>into(new GlobalWindows()) 
        .triggering(AfterProcessingTime 
         .pastFirstElementInPane() 
         .plusDelayOf(Duration.standardMinutes(1)) 
        ) 
        .withAllowedLateness(Duration.ZERO).discardingFiredPanes() 
      ); 
+0

より良い回答を得るために現在のコードの関連部分を追加してください。 –

+0

@TahirAkhtar私は質問をよく説明するためにいくつかのコードを追加しました。 –

答えて

0

これが明示的にPubsubIOでサポートされていない以下

は、元のように見えるおおよそ方法です現在、ストリーミングジョブを定期的に開始し、数分後にプログラムでDrainを呼び出すこともできます。

関連する問題