ストリーミングモードでpubsubを介してデータフローにメッセージを受信しています(これは私の欲求に必要です)。 各メッセージは、GCSの独自のファイルに保存する必要があります。 TextIO.Writeの無制限コレクションはサポートされていないので、私は1つの要素をそれぞれ含むウィンドウにPCollectionを分割しようとしました。 各ウィンドウをgoogle-cloud-storageに書き込みます。私はまだ同じエラーがウインドウの前に持って受け取るClouding Dataflowを使用してPubSubからウィンドウを使用してGoogle Cloud Storageに書き込む
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.create()
.as(DataflowPipelineOptions.class);
options.setRunner(BlockingDataflowPipelineRunner.class);
options.setProject(PROJECT_ID);
options.setStagingLocation(STAGING_LOCATION);
options.setStreaming(true);
Pipeline pipeline = Pipeline.create(options);
PubsubIO.Read.Bound<String> readFromPubsub = PubsubIO.Read.named("ReadFromPubsub")
.subscription(SUBSCRIPTION);
PCollection<String> streamData = pipeline.apply(readFromPubsub);
PCollection<String> windowedMessage = streamData.apply(Window.<String>triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))).discardingFiredPanes());
e
windowedMessage.apply(TextIO.Write.to("gs://pubsub-outputs/1"));
pipeline.run();
}
:
は、ここに私のコードです。
The DataflowPipelineRunner in streaming mode does not support TextIO.Write.
上記を実行するためのコードは何ですか?
可能な複製(http://stackoverflow.com/questions/40402150/creating-a-custom-sink-in-data-flow) – jkff