2017-10-13 5 views
0

Bigqueryでは、現時点では日付のみでパーティションを作成できます。テーブルのパーティション化

私はフィールドと10億テーブルの行を持っています。このフィールドの有効期限は1年前です。

既存のデータを新しいパーティションテーブルに移動する正しい方法は何ですか?

編集私は、データをウィンドウイングテーブル名(またはパーティションの接尾辞)をパラメータ化することである< 2.0 Sharding BigQuery output tablesBigQuery partitioning with Beam streamsで精緻化バージョンを使用してJavaでエレガントな解決策があった見ました。

しかし、私は2.xビームプロジェクトで、Pythonのシリアライズ可能な関数からウィンドウ時間を取得するサンプルもありません。

パイプでパーティションを作成しようとしましたが、多数のパーティションで失敗した場合(100で動作していても1000で失敗しました)。

これは限り私ができるように私のコードです:

   ( p 
       | 'lectura' >> beam.io.ReadFromText(input_table) 
       | 'noheaders' >> beam.Filter(lambda s: s[0].isdigit()) 
       | 'addtimestamp' >> beam.ParDo(AddTimestampDoFn()) 
       | 'window' >> beam.WindowInto(beam.window.FixedWindows(60)) 
       | 'table2row' >> beam.Map(to_table_row) 
       | 'write2table' >> beam.io.Write(beam.io.BigQuerySink(
         output_table, #<-- unable to parametrize by window 
         dataset=my_dataset, 
         project=project, 
         schema='dia:DATE, classe:STRING, cp:STRING, import:FLOAT', 
         create_disposition=CREATE_IF_NEEDED, 
         write_disposition=WRITE_TRUNCATE, 
            ) 
           ) 
       ) 

p.run() 
+2

https://stackoverflow.com/questions/38993877/migrating-from-non-partitioned-to-partitioned-tables関連しているカップルのアプローチを持っている必要があります。また、フラットファイルでの作業を避けるために、CSVではなくJSONまたはAVROを使用できるようにする必要があります。 –

+0

@NhanNguyenは、より具体的に私の質問を編集しました。 <2.0で上品なソリューションが存在し、> 2.xでそれを逃してしまいます。あなたのリンクをありがとう、私はそれに従って、非常に関連した問題だった。再度、感謝します。 – danihp

答えて

2

それが現在のJava SDKに限定することができるが、これを行うために必要な機能のすべてが、ビーム内に存在します。

BigQueryIOを使用します。具体的には、DynamicDestinationsを使用して、各行の宛先テーブルを決定することができます。 DynamicDestinationsの例から

events.apply(BigQueryIO.<UserEvent>write() 
    .to(new DynamicDestinations<UserEvent, String>() { 
     public String getDestination(ValueInSingleWindow<String> element) { 
      return element.getValue().getUserId(); 
     } 
     public TableDestination getTable(String user) { 
      return new TableDestination(tableForUser(user), 
      "Table for user " + user); 
     } 
     public TableSchema getSchema(String user) { 
      return tableSchemaForUser(user); 
     } 
     }) 
    .withFormatFunction(new SerializableFunction<UserEvent, TableRow>() { 
    public TableRow apply(UserEvent event) { 
     return convertUserEventToTableRow(event); 
    } 
    })); 
+0

なぜ彼らはそれを行うためのPythonラッパーではありませんか?私はpythonの代わりにJavaでデータフロープロジェクトを買う余裕がありますか? GoogleがJavaのリソースに不具合を抱えているかどうか知っていますか?つまり、私がPythonで作業すると、これよりも多くの機能が欠けてしまうでしょうか?ありがとう! – danihp

+0

これが示すように、JavaとPython SDKの間には異なる機能があります。これらのギャップに対処することは、Apache Beamの継続的な取り組みの一環です。この特定の問題は[BEAM-2801](https://issues.apache.org/jira/browse/BEAM-2801)として追跡されます。 –

関連する問題