google-cloud-dataflow

    0

    1答えて

    セッションのPCollectionを取得し、チャネル/接続ごとの平均セッション時間を取得しようとしています。私は、初期のトリガーが生成された各ウィンドウに対して発火するようなことをしています.60分のウィンドウが1分ごとにスライドすると、初期のトリガーは60回発火します。アウトプットのタイムスタンプを見ると、毎分60分のウィンドウがあります。私は最後のウィンドウのためにトリガーを1回発射したいので

    0

    2答えて

    Apache Beam 2.xに移行するには、可能な限りテンプレートを使用し、それに応じてValueProviderを使用したいと考えています。 私のロジックでは、FixedWindowを使用していますが、期間は柔軟なので、むしろValueProviderから取得します。 FixedWindows.of()はDurationしか取得できないため、ValueProviderを取得してそこから期間を取

    -1

    1答えて

    を受け取りましたGoogleのクラウドデータフローを使用してデータのキャッシュを作成し、Webリクエストを高速化しています。データセットとそれがどのようにグループ化されるかは、私たちのコントロールから少し外れているので、私たちはいくつかの非常に非正統なことをやっています。とにかく、私たちはしばらくこのエラーを毎回受けていますが、仕事は時折実行され、成功することがあります。 ジョブID:2017-1

    0

    1答えて

    Google DataflowにApache Beam SDKを使用する一連のトランザクションから日次の残高を計算したいと考えています。目的の結果セットは次のようになり John, 2017-12-01, 100 John, 2017-12-01, 200 Jane, 2017-12-01, 150 John, 2017-12-02, -100 John, 2017-12-02, 300

    0

    1答えて

    私は、Apacheのビーム上でKafkaIOを使用して複数のkafkaブローカーから読もうとしています。オフセット管理のデフォルトオプションはkafkaパーティション自体です(kafka> 0.9のzookeperは使用しません)。この設定では、私はジョブ/パイプラインを再起動すると、重複しているレコードと不足しているレコードに問題があります。 私が読んだことから、これを処理する最良の方法は、外部

    0

    1答えて

    Eclipse IDEと関連プラグインを使用している場合、IDEからローカルでもデータフローでも簡単にジョブを実行できます。 ジョブを「プロダクト化」しようとすると(テンプレートを使用できない既知のDataflow/Bigqueryの制限のために)、「クラウドビルド」環境でコマンドラインから「mvn package」を使用しようとしています。それは常に "パッケージorg.apache.beam.

    0

    1答えて

    私は一定の時間間隔でトリガーするGoogleクラウドデータフロージョブを持っています。データフロージョブが完了した後で、ジョブのステータスを特定のメールIDに送信した後、メールをトリガーする必要があります。あなたはまた、result.metrics()によって返された電子メールに完成したジョブのメトリックを含めることができ PipelineResult result = pipeline.run(

    1

    2答えて

    私のクエリが何百万行も返された場合、JdbcIOがどのようにクエリを並列に実行するかを知りたいと思います。 私はhttps://issues.apache.org/jira/browse/BEAM-2803と関連するプルリクエストを参照しました。私はそれを完全に理解できませんでした。 ReadAllexpandの方法はParDoを使用します。したがって、データベースへの複数の接続を作成して、データ

    0

    1答えて

    Cloud Pub/Subからデータを読み取り、それをCloud DataflowでBigQueryに書きたいとします。各データには、データ自体が保存されるテーブルIDが含まれています。 のBigQueryへの書き込みに失敗した様々な要因があります。 表のIDのフォーマットが間違っているが。 データセットが存在しません。 データセットでは、パイプラインにアクセスできません。 ネットワーク障害。 エ

    2

    1答えて

    GCSから読み取るときにファイル名を取得するには、とにかく: p.apply( "GCSからの読み取り"、TextIO.read()。( "gs:// path/*")) 。 出力を適切なテーブルに保存するには、次のParDoにファイル名が必要です。 この質問はHow to Get Filename when using file pattern match in google-cloud-dat