2016-09-16 3 views
1

を開始し得る次のように私はtimewindowの上に(1を加算)のカウントを計算しています:FLINK timeWindowは時間

mappedUserTrackingEvent 
      .keyBy("videoId", "userId") 
      .timeWindow(Time.seconds(30)) 
      .sum("count") 

私は実際にはあまりにもキーフィールドとしてウィンドウの開始時間を追加したいと思います。結果は次のようなものになります。

key: videoId=123,userId=234,time=2016-09-16T17:01:30 
value: 50 

したがって、本質的にウィンドウごとに集計します。 End Goalは、これらのウィンドウのヒストグラムを描画します。

ウィンドウの先頭をキーのフィールドとして追加するにはどうすればよいですか?この場合、ウィンドウを00または30に揃えますか?それは可能ですか?

答えて

2

WindowFunctionapply()方法は、あなたがkeyBy().timeWindow()を使用する場合TimeWindowあるWindowオブジェクトを提供します。 TimeWindowオブジェクトには、ウィンドウの開始と終了のタイムスタンプをそれぞれ返す2つのメソッド、getStart()getEnd()があります。

今のところsum()アグリゲーションをWindowFunctionと一緒に使用することはできません。

mappedUserTrackingEvent 
     .keyBy("videoId", "userId") 
     .timeWindow(Time.seconds(30)) 
     .apply(new MySumReduceFunction(), new MyWindowFunction());` 

MySumReduceFunctionReduceFunctionインタフェースを実装し、インクリメンタルウィンドウに到着要素を集計して合計を計算:あなたのような何かをする必要があります。 MyWindowFunctionWindowFunctionを実装しています。 Iterableパラメータを介して集約値を受け取り、TimeWindowパラメータから取得したタイムスタンプで値を強化します。

0

sumの代わりにaggregateメソッドを使用できます。
aggregateでは、secondlyパラメータの実装はWindowFunctionまたはProcessWindowFunctionです。

mappedUserTrackingEvent 
    .keyBy("videoId", "userId") 
    .timeWindow(Time.seconds(30)) 
    .aggregate(new Count(), new MyProcessWindowFunction(); 

public static class MyProcessWindowFunction extends ProcessWindowFunction<Integer, Tuple2<Long, Integer>, Tuple, TimeWindow> 
{ 
    @Override 
    public void process(Tuple tuple, Context context, Iterable<Integer> iterable, Collector<Tuple2<Long, Integer>> collector) throws Exception 
    { 
     context.currentProcessingTime(); 
     context.window().getStart(); 
    } 
} 
:私はFLINK-1.4.0を使用しています
は、同様に、 ProcessWindowFunctionを使用することをお勧めします
関連する問題