2016-04-19 7 views
0

FlinkのTimeWindow機能を使用して計算を実行しています。私は5分Windowを作成しています。しかし、私は最初に1時間だけWindowを作成したいと思います。私が必要とする次のWindowsは5分です。FlinkのカスタムWindowsチャージ

最初の1時間は、データが収集され、その操作が実行されます。これが行われると、5分ごとに同じ操作が実行されます。

これはtriggerで実装できるとわかりますが、私はどのように使うべきか、どのようにtriggerを使うべきかわかりません。

UPDATE:私は、彼らはただwindowごとにトリガ時間/回数を定義し、私が得ることができるものから、さえtriggersが有用であるとは思わないではなく、最初のwindowがトリガされる場合。

+0

私はこれが現在のDataStream APIで可能であるとは思っていません。ウィンドウを定義する場合、定義は実行時にビルドされるすべてのインスタンスに対して "同じ"です...唯一の方法は、カスタム演算子を定義して、それを '.transform(...) 'によってプログラムに追加することです。しかし、これは正しいことをするのはかなり面倒です。 –

答えて

2

これは実装するのは簡単ではありません。

KeyedStreamが指定されている場合、GlobalWindowと、最初に起動したかどうかを「覚えている」カスタムステートフルTriggerを使用する必要があります。 GlobalWindowTriggerについて

val stream: DataStream[(String, Int)] = ??? 
val result = stream 
    .keyBy(0) 
    .window(GlobalWindows.create()) 
    .trigger(new YourTrigger()) 
    .apply(new YourWindowFunction()) 

詳細はFLINK Window documentationです。

関連する問題