GoogleのデータフローがGoogleのクラウドストレージから圧縮ファイルを読み込む方法は最近変更されていますか?私はGCSから圧縮CSVログファイルを読み込み、これらのファイルをデータフローパイプラインのソースとして使用するプロジェクトに取り組んでいます。最近まで、これはファイルの圧縮タイプを指定して、または指定しなくても完全に機能しました。Googleのクラウドデータフローの圧縮ファイルの処理の変更
現在、DoFn内のprocessElementメソッドは、ファイルに多数の行がありますが、(csvヘッダー行の)1回だけ呼び出されます。同じソースファイルを圧縮解除して使用すると、すべてが期待通りに機能します(processElementメソッドはすべての行で呼び出されます)。ここに示唆したように、https://stackoverflow.com/a/27775968/6142412にContent-Encodingをgzipに設定することはできますが、以前はこれを行う必要はありませんでした。
DirectPipelineRunnerまたはDataflowPipelineRunnerを使用しているときに、この問題が発生しています。私は、クラウドデータフローsdkのバージョン1.5.0を使用しています。
申し訳ありません。入力を読み込むために使用しているものを明確にすることができますか(TextIOですか?ファイル名を除いてパラメータを設定しますか?)、誤った動作IDの例を挙げますか? – jkff
また、DoFnがヘッダー行に対してのみ呼び出されていることをどのように検出しているのかを明確にしてください。 – jkff
@jkff DoFn processElement呼び出しを検出するには、DirectPipelineRunnerでデバッグするときにprocessElementメソッドでブレークポイントを追加しました。 TextIOを使用して入力を読み込んでいます。 (ファイル名).withCompressionType(TextIO.CompressionType.GZIP) – Davin