Google Cloud Storageに保存されたカスタマイズされたバイナリファイルをデータフロージョブで処理する必要があります。FileBasedSourceはGoogle Cloud Storageのいくつかの特定のファイルに対応するグロブを理解できません
これを行うには、私はカスタムFileBasedSourceを書きました。ドキュメントに記載されているように、は、Javaグロブ、単一ファイル、または1つのファイルのオフセット範囲として定義されたファイルパターンでバックアップされています。
私の場合、私は、/path/{file1,file1,file3}
のようないくつかの特定のファイル名を持つJavaグロブを使用する必要があります。私は、ローカルファイルシステム上でそれをテストするとき、それが正常に動作しますが、私はGoogleのクラウドストレージ(gs://bucket/{file1,file2,file3}
)とそれを使用する場合には、任意のファイルを見つけることができないと私は、次のスタックトレースを取得:私は正確にこれを使用する場合は
java.io.IOException: Error executing batch GCS request
at org.apache.beam.sdk.util.GcsUtil.executeBatches(GcsUtil.java:603)
at org.apache.beam.sdk.util.GcsUtil.getObjects(GcsUtil.java:342)
at org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.matchNonGlobs(GcsFileSystem.java:217)
at org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.match(GcsFileSystem.java:86)
at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:111)
at org.apache.beam.sdk.io.FileBasedSource.getEstimatedSizeBytes(FileBasedSource.java:207)
at org.apache.beam.runners.dataflow.internal.CustomSources.serializeToCloudSource(CustomSources.java:78)
at org.apache.beam.runners.dataflow.ReadTranslator.translateReadHelper(ReadTranslator.java:53)
at org.apache.beam.runners.dataflow.ReadTranslator.translate(ReadTranslator.java:40)
at org.apache.beam.runners.dataflow.ReadTranslator.translate(ReadTranslator.java:37)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:439)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:602)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:594)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:276)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:210)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:440)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:383)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:173)
at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:556)
at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:167)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
at com.travelaudience.data.job.rtbtobigquery.Main$.main(Main.scala:74)
at com.travelaudience.data.job.rtbtobigquery.Main.main(Main.scala)
Caused by: java.util.concurrent.ExecutionException: com.google.api.client.http.HttpResponseException: 400 Bad Request
at org.apache.beam.sdks.java.extensions.google.cloud.platform.core.repackaged.com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:500)
at org.apache.beam.sdks.java.extensions.google.cloud.platform.core.repackaged.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:479)
at org.apache.beam.sdks.java.extensions.google.cloud.platform.core.repackaged.com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:76)
at org.apache.beam.sdk.util.GcsUtil.executeBatches(GcsUtil.java:595)
... 23 more
Caused by: com.google.api.client.http.HttpResponseException: 400 Bad Request
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1070)
at com.google.api.client.googleapis.batch.BatchRequest.execute(BatchRequest.java:241)
at org.apache.beam.sdk.util.GcsUtil$3.call(GcsUtil.java:588)
at org.apache.beam.sdk.util.GcsUtil$3.call(GcsUtil.java:586)
at org.apache.beam.sdks.java.extensions.google.cloud.platform.core.repackaged.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:111)
at org.apache.beam.sdks.java.extensions.google.cloud.platform.core.repackaged.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:58)
at org.apache.beam.sdks.java.extensions.google.cloud.platform.core.repackaged.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:75)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
を同じglobをgsutil
としてください。gsutil ls gs://bucket/{file1,file2,file3}
のように3つのファイルが正しくリストされています。コードから、gs://bucket/dir/*
のようなグロブが働きます。
私はビームバージョン2.1.0を使用します。
何が間違っていますか?
ありがとうございました!
回答ありがとうございます。あなたは '{'かどうかをサポートするつもりですか?これは本当に役に立ちます。 – bnjzer
私は、特にこれに取り組んでいる人は誰も気づいていませんが、もちろんそれは歓迎すべき貢献です。 Beam 2.2(既にmasterで利用可能で、リリースは現在投票中です)では、複数の別々のファイルがある場合は、FileIO.match()/ matchAll()/ read()の使用を検討してください。 FileBasedSourceを実装するよりも、ほんの一桁の定型文を必要とします。 – jkff