1

pubサブストリームのストリームを読み込んで大きなクエリに書き込むデータストアフローを作成しようとしています。CloudflowからBigQueryへの書き込み:入力からサイド入力ビューを作成できません

私は、スタックトレースを持つ「入力からサイド入力ビューを作成できません」というエラーを取得ツールを実行しようとすると:

Exception in thread "main" java.lang.IllegalStateException: Unable to create a side-input view from input 
at com.google.cloud.dataflow.sdk.transforms.View$AsIterable.validate(View.java:277) 
at com.google.cloud.dataflow.sdk.transforms.View$AsIterable.validate(View.java:268) 
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:366) 
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:274) 
at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:161) 
at com.google.cloud.dataflow.sdk.io.Write$Bound.createWrite(Write.java:214) 
at com.google.cloud.dataflow.sdk.io.Write$Bound.apply(Write.java:79) 
at com.google.cloud.dataflow.sdk.io.Write$Bound.apply(Write.java:68) 
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74) 
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.apply(DirectPipelineRunner.java:247) 
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:367) 
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:290) 
at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:174) 
at com.google.cloud.dataflow.sdk.io.BigQueryIO$Write$Bound.apply(BigQueryIO.java:1738) 
at com.google.cloud.dataflow.sdk.io.BigQueryIO$Write$Bound.apply(BigQueryIO.java:1440) 
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74) 
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.apply(DirectPipelineRunner.java:247) 
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:367) 
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:274) 
at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:161) 
at co.uk.bubblestudent.dataflow.StarterPipeline.main(StarterPipeline.java:116) 
Caused by: java.lang.IllegalStateException: GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey. 
at com.google.cloud.dataflow.sdk.transforms.GroupByKey.applicableTo(GroupByKey.java:192) 
at com.google.cloud.dataflow.sdk.transforms.View$AsIterable.validate(View.java:275) 
... 20 more 

私のコードは次のとおりです。

public class StarterPipeline { 


public static final Duration ONE_DAY = Duration.standardDays(1); 
public static final Duration ONE_HOUR = Duration.standardHours(1); 
public static final Duration TEN_SECONDS = Duration.standardSeconds(10); 
private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class); 

    private static TableSchema schemaGen() { 
    List<TableFieldSchema> fields = new ArrayList<>(); 
    fields.add(new TableFieldSchema().setName("facebookID").setType("STRING")); 
    fields.add(new TableFieldSchema().setName("propertyID").setType("STRING")); 
    fields.add(new TableFieldSchema().setName("time").setType("TIMESTAMP")); 
    TableSchema schema = new TableSchema().setFields(fields); 
    return schema; 
    } 

    public static void main(String[] args) { 
    LOG.info("Starting"); 
    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); 
    LOG.info("Pipeline made"); 
    // For Cloud execution, set the Cloud Platform project, staging location, 
    // and specify DataflowPipelineRunner or BlockingDataflowPipelineRunner. 
    options.setProject(<project>); 
    options.setStagingLocation(<bucket>); 
    options.setTempLocation(<bucket>); 
    Pipeline p = Pipeline.create(options); 


    TableSchema schema = schemaGen(); 
    LOG.info("Schema made"); 
    try { 
    LOG.info(schema.toPrettyString()); 
} catch (IOException e) { 
    // TODO Auto-generated catch block 
    e.printStackTrace(); 
} 
    PCollection<String> input = p.apply(PubsubIO.Read.named("ReadFromPubsub").subscription(<subscription>)); 

    PCollection<TableRow> pardo = input.apply(ParDo.of(new FormatAsTableRowFn())); 
    LOG.info("Formatted Row"); 

    pardo.apply(BigQueryIO.Write.named("Write into BigQuery").to(<table>) 
     .withSchema(schema) 
     .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) 
     .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); 
    LOG.info("about to run"); 
    p.run(); 

    } 


    static class FormatAsTableRowFn extends DoFn<String, TableRow> { 
    @Override 
    public void processElement(ProcessContext c) { 
     LOG.info("Formatting"); 
     String json = c.element(); 

     //HashMap<String,String> items = new Gson().fromJson(json, new TypeToken<HashMap<String, String>>(){}.getType()); 

     // Make a BigQuery row from the JSON object: 
     TableRow row = new TableRow() 
      .set("facebookID","324234") 
      .set("properttyID", "23423") 
      .set("time", "12312313123"); 


     /* 
     *  TableRow row = new TableRow() 
      .set("facebookID", items.get("facbookID")) 
      .set("properttyID", items.get("propertyID")) 
      .set("time", items.get("time")); 
     */ 
     c.output(row); 
    } 
    } 
} 

どれをこれが何であるかに関する提案?

+0

使用しているDataflow SDKのバージョンは何ですか? – danielm

+0

1.1.2 for eclipse –

+0

私は1.1.2リリースがあったとは思わない。データフローは1.6.0になりました。あなたはそれを試すことができますか? – danielm

答えて

1

BigQueryIOのデフォルトの実装は境界のあるPCollectionに対してのみ機能し、PubsubIO.Readは無制限のPCollectionを生成します。

これを修正するには、PubsubIOトランスフォームでmaxReadTimeまたはmaxNumElementsを呼び出して入力をバインドするか、オプションでsetStreaming(true)を呼び出すことでBigQueryIOのストリーミング挿入型を使用できます。

+0

ああ、大丈夫、答えてくれてありがとう!私は明日の朝に事務所でこれをチェックします。 –

+0

これはうまくいってプロジェクトが始まりましたが、2014年8月4日9:47:55 AM com.google.api.client.http.HttpRequest execute 警告:要求の実行中に例外がスローされました java.net .SocketTimeoutException:20秒おきにRead out timed out。私はこれを見て、あなたはソケットのタイムアウト時間を編集する必要があるようですね?しかし、いくつかの簡単な例ではこれが行われていないので、私はうまくいくと思っています - https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/complete /TrafficRoutes.java –

+0

例外がどこから来たのか、もう少し詳しく知ることができますか?一般に、タイムアウトを混乱させる必要はありませんが、ローカルで実行している場合は、遅延の遅いネットワークを使用している可能性があります。 – danielm

関連する問題