2017-12-28 59 views
0

Cloud Pub/Subからデータを読み取り、それをCloud DataflowでBigQueryに書きたいとします。各データには、データ自体が保存されるテーブルIDが含まれています。BigQueryIO.Writeによってスローされた例外をキャッチして、出力に失敗したデータを救助する方法はありますか?

のBigQueryへの書き込みに失敗した様々な要因があります。

  • 表のIDのフォーマットが間違っているが。
  • データセットが存在しません。
  • データセットでは、パイプラインにアクセスできません。
  • ネットワーク障害。

エラーのいずれかが発生すると、ストリーミングジョブはタスクを再試行し、ストールします。私はWriteResult.getFailedInserts()を使用して、不良データを救済し、ストールを回避しようとしましたが、うまく機能しませんでした。良い方法はありますか?パイプライン定義に出力に書き込むときに例外をキャッチする簡単な方法はありません

public class StarterPipeline { 
    private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class); 

    public class MyData implements Serializable { 
    String table_id; 
    } 

    public interface MyOptions extends PipelineOptions { 
    @Description("PubSub topic to read from, specified as projects/<project_id>/topics/<topic_id>") 
    @Validation.Required 
    ValueProvider<String> getInputTopic(); 
    void setInputTopic(ValueProvider<String> value); 
    } 

    public static void main(String[] args) { 
    MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class); 

    Pipeline p = Pipeline.create(options); 

    PCollection<MyData> input = p 
     .apply("ReadFromPubSub", PubsubIO.readStrings().fromTopic(options.getInputTopic())) 
     .apply("ParseJSON", MapElements.into(TypeDescriptor.of(MyData.class)) 
      .via((String text) -> new Gson().fromJson(text, MyData.class))); 
    WriteResult writeResult = input 
     .apply("WriteToBigQuery", BigQueryIO.<MyData>write() 
      .to(new SerializableFunction<ValueInSingleWindow<MyData>, TableDestination>() { 
       @Override 
       public TableDestination apply(ValueInSingleWindow<MyData> input) { 
       MyData myData = input.getValue(); 
       return new TableDestination(myData.table_id, null); 
       } 
      }) 
      .withSchema(new TableSchema().setFields(new ArrayList<TableFieldSchema>() {{ 
       add(new TableFieldSchema().setName("table_id").setType("STRING")); 
      }})) 
      .withFormatFunction(new SerializableFunction<MyData, TableRow>() { 
       @Override 
       public TableRow apply(MyData myData) { 
       return new TableRow().set("table_id", myData.table_id); 
       } 
      }) 
      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) 
      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) 
      .withFailedInsertRetryPolicy(InsertRetryPolicy.neverRetry())); 
    writeResult.getFailedInserts() 
     .apply("LogFailedData", ParDo.of(new DoFn<TableRow, TableRow>() { 
      @ProcessElement 
      public void processElement(ProcessContext c) { 
      TableRow row = c.element(); 
      LOG.info(row.get("table_id").toString()); 
      } 
     })); 

    p.run(); 
    } 
} 

答えて

1

は、ここに私のコードです。 BigQueryのカスタムPTransformを書くことでそれを実現できると思います。しかし、Apache Beamでそれをネイティブに行う方法はありません。私はCloud Dataflowの自動再試行機能を損なうため、これに反対することも推奨します。

コード例では、失敗した挿入リトライポリシーを再試行しないように設定しています。常に再試行するようにポリシーを設定できます。これは、間欠的なネットワーク障害(第4の箇条書き)のようなときにのみ有効です。

.withFailedInsertRetryPolicy(InsertRetryPolicy.alwaysRetry()) 

テーブルIDのフォーマットは(第一の箇条書き)間違っている場合は、CREATE_IF_NEEDEDは配置構成を作成し、テーブルIDが正しくない場合でも、データフロージョブは自動的にエラーなしで新しいテーブルを作成できるようにする必要があります。

データセットにアクセス許可の問題がある場合(第2および第3の箇条書き)、私の意見では、ストリーミングジョブがストールし、最終的に失敗するということです。手作業による介入なしには、いかなる状況下でも進める方法はありません。

関連する問題