1

Google Cloud Storageに保存されたカスタマイズされたバイナリファイルをデータフロージョブで処理する必要があります。FileBasedSourceはGoogle Cloud Storageのいくつかの特定のファイルに対応するグロブを理解できません

これを行うには、私はカスタムFileBasedSourceを書きました。ドキュメントに記載されているように、は、Javaグロブ、単一ファイル、または1つのファイルのオフセット範囲として定義されたファイルパターンでバックアップされています

私の場合、私は、/path/{file1,file1,file3}のようないくつかの特定のファイル名を持つJavaグロブを使用する必要があります。私は、ローカルファイルシステム上でそれをテストするとき、それが正常に動作しますが、私はGoogleのクラウドストレージ(gs://bucket/{file1,file2,file3})とそれを使用する場合には、任意のファイルを見つけることができないと私は、次のスタックトレースを取得:私は正確にこれを使用する場合は

java.io.IOException: Error executing batch GCS request 
     at org.apache.beam.sdk.util.GcsUtil.executeBatches(GcsUtil.java:603) 
     at org.apache.beam.sdk.util.GcsUtil.getObjects(GcsUtil.java:342) 
     at org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.matchNonGlobs(GcsFileSystem.java:217) 
     at org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.match(GcsFileSystem.java:86) 
     at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:111) 
     at org.apache.beam.sdk.io.FileBasedSource.getEstimatedSizeBytes(FileBasedSource.java:207) 
     at org.apache.beam.runners.dataflow.internal.CustomSources.serializeToCloudSource(CustomSources.java:78) 
     at org.apache.beam.runners.dataflow.ReadTranslator.translateReadHelper(ReadTranslator.java:53) 
     at org.apache.beam.runners.dataflow.ReadTranslator.translate(ReadTranslator.java:40) 
     at org.apache.beam.runners.dataflow.ReadTranslator.translate(ReadTranslator.java:37) 
     at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:439) 
     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:602) 
     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:594) 
     at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:276) 
     at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:210) 
     at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:440) 
     at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:383) 
     at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:173) 
     at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:556) 
     at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:167) 
     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297) 
     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283) 
     at com.travelaudience.data.job.rtbtobigquery.Main$.main(Main.scala:74) 
     at com.travelaudience.data.job.rtbtobigquery.Main.main(Main.scala) 
Caused by: java.util.concurrent.ExecutionException: com.google.api.client.http.HttpResponseException: 400 Bad Request 
     at org.apache.beam.sdks.java.extensions.google.cloud.platform.core.repackaged.com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:500) 
     at org.apache.beam.sdks.java.extensions.google.cloud.platform.core.repackaged.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:479) 
     at org.apache.beam.sdks.java.extensions.google.cloud.platform.core.repackaged.com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:76) 
     at org.apache.beam.sdk.util.GcsUtil.executeBatches(GcsUtil.java:595) 
     ... 23 more 
Caused by: com.google.api.client.http.HttpResponseException: 400 Bad Request 
     at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1070) 
     at com.google.api.client.googleapis.batch.BatchRequest.execute(BatchRequest.java:241) 
     at org.apache.beam.sdk.util.GcsUtil$3.call(GcsUtil.java:588) 
     at org.apache.beam.sdk.util.GcsUtil$3.call(GcsUtil.java:586) 
     at org.apache.beam.sdks.java.extensions.google.cloud.platform.core.repackaged.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:111) 
     at org.apache.beam.sdks.java.extensions.google.cloud.platform.core.repackaged.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:58) 
     at org.apache.beam.sdks.java.extensions.google.cloud.platform.core.repackaged.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:75) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
     at java.lang.Thread.run(Thread.java:748) 

を同じglobをgsutilとしてください。gsutil ls gs://bucket/{file1,file2,file3}のように3つのファイルが正しくリストされています。コードから、gs://bucket/dir/*のようなグロブが働きます。

私はビームバージョン2.1.0を使用します。

何が間違っていますか?

ありがとうございました!

答えて

1

ビームサポートonly a subset of the glob syntaxと一致するGCSファイル。これは*?をサポートしますが、{}はサポートしません。私たちのドキュメントは現在のところ説明していません - FileSystems.match()に文書化され、グロブマッチングがユーザーに浮かび上がっている他のクラスからリンクされているはずです。

+0

回答ありがとうございます。あなたは '{'かどうかをサポートするつもりですか?これは本当に役に立ちます。 – bnjzer

+0

私は、特にこれに取り組んでいる人は誰も気づいていませんが、もちろんそれは歓迎すべき貢献です。 Beam 2.2(既にmasterで利用可能で、リリースは現在投票中です)では、複数の別々のファイルがある場合は、FileIO.match()/ matchAll()/ read()の使用を検討してください。 FileBasedSourceを実装するよりも、ほんの一桁の定型文を必要とします。 – jkff

関連する問題