2016-11-06 2 views
2

ストリーミングモードでpubsubを介してデータフローにメッセージを受信して​​います(これは私の欲求に必要です)。 各メッセージは、GCSの独自のファイルに保存する必要があります。 TextIO.Writeの無制限コレクションはサポートされていないので、私は1つの要素をそれぞれ含むウィンドウにPCollectionを分割しようとしました。 各ウィンドウをgoogle-cloud-storageに書き込みます。私はまだ同じエラーがウインドウの前に持って受け取るClouding Dataflowを使用してPubSubからウィンドウを使用してGoogle Cloud Storageに書き込む

public static void main(String[] args) {  

      DataflowPipelineOptions options = PipelineOptionsFactory.create() 
        .as(DataflowPipelineOptions.class); 
       options.setRunner(BlockingDataflowPipelineRunner.class);     
       options.setProject(PROJECT_ID);    
       options.setStagingLocation(STAGING_LOCATION); 
       options.setStreaming(true); 
       Pipeline pipeline = Pipeline.create(options); 

       PubsubIO.Read.Bound<String> readFromPubsub = PubsubIO.Read.named("ReadFromPubsub") 
         .subscription(SUBSCRIPTION); 

       PCollection<String> streamData = pipeline.apply(readFromPubsub);   



       PCollection<String> windowedMessage = streamData.apply(Window.<String>triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))).discardingFiredPanes()); 
      e 


       windowedMessage.apply(TextIO.Write.to("gs://pubsub-outputs/1")); 

       pipeline.run(); 
     } 

は、ここに私のコードです。

The DataflowPipelineRunner in streaming mode does not support TextIO.Write. 

上記を実行するためのコードは何ですか?

+0

可能な複製(http://stackoverflow.com/questions/40402150/creating-a-custom-sink-in-data-flow) – jkff

答えて

2

TextIOはBound PCollectionと連携して、APIストレージを使用してGCSに書き込むことができます。

あなたは行うことができます:

PipeOptions options = data.getPipeline().getOptions().as(PipeOptions.class); 
    data.apply(WithKeys.of(new SerializableFunction<String, String>() { 
      public String apply(String s) { return "mykey"; } }))   

    .apply(Window.<KV<String, String>>into(FixedWindows.of(Duration.standardMinutes(options.getTimeWrite())))) 
    .apply(GroupByKey.create()) 
    .apply(Values.<Iterable<String>>create()) 
    .apply(ParDo.of(new StorageWrite(options))); 

あなたはGROUPBYの操作でウィンドウを作成し、ストレージに反復可能で書くことができます。 StorageWriteのprocessElement:[データフローでカスタムシンクを作成する]の

 PipeOptions options = c.getPipelineOptions().as(PipeOptions.class); 
     String date = ISODateTimeFormat.date().print(c.window().maxTimestamp()); 
     String isoDate = ISODateTimeFormat.dateTime().print(c.window().maxTimestamp()); 
     String blobName = String.format("%s/%s/%s", options.getBucketRepository(), date, options.getFileOutName() + isoDate); 

     BlobId blobId = BlobId.of(options.getGCSBucket(), blobName); 

     WriteChannel writer = storage.writer(BlobInfo.builder(blobId).contentType("text/plain").build()); 

     for (Iterator<String> it = c.element().iterator(); it.hasNext();) { 
      writer.write(ByteBuffer.wrap(it.next().getBytes())); 
     } 
     writer.close(); 
+0

いいですね....試してみます – Ruchy

+0

うまくいけば答えを受け入れてください。 –

関連する問題