2017-12-19 8 views
1

私はEventTimeに基づいてウィンドウをテストする簡単な例を実行しています。私は処理時間で出力を生成することができますが、私がEventTimeを使用しているときに出力はありません。私が間違っていることを理解するのを助けてください。Flinkストリーミングイベント時間ウィンドウ

5秒ごとにスライドするサイズ10秒のSlidingWindowを作成しています。ウィンドウの最後に、その時間に受信したメッセージ数が表示されます。

input : 
a,1513695853 (generated at 13th second, received at 13th second) 
a,1513695853 (generated at 13th second, received at 13th second) 
a,1513695856 (generated at 16th second, received at 19th second) 
a,1513695859 (generated at 13th second, received at 19th second) 

第2フィールドは、イベントのタイムスタンプを表し、第13、第13、第16、第19秒を表します。出力なしより

if i am using Processing Time window : 

Output : 
(a,1) 
(a,3) 
(a,2) 

しかし、私はイベント時間を使用していた印刷です。間違っていることを理解するのを助けてください。

package org.apache.flink.window.training; 

import java.io.InputStream; 
import java.util.Properties; 

import org.apache.flink.api.common.functions.FoldFunction; 
import org.apache.flink.api.common.functions.MapFunction; 
import org.apache.flink.api.java.functions.KeySelector; 
import org.apache.flink.api.java.tuple.Tuple2; 
import org.apache.flink.streaming.api.TimeCharacteristic; 
import org.apache.flink.streaming.api.datastream.DataStream; 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; 
import org.apache.flink.streaming.api.watermark.Watermark; 
import org.apache.flink.streaming.api.windowing.time.Time; 
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; 
import org.apache.flink.streaming.util.serialization.SimpleStringSchema; 

import com.fasterxml.jackson.databind.ObjectMapper; 

public class SocketStream { 


    private static Properties properties = new Properties(); 

    public static void main(String args[]) throws Exception { 
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

    InputStream inputStream = 
     SocketStream.class.getClassLoader().getResourceAsStream("local-kafka-server.properties"); 

    properties.load(inputStream); 

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 

    FlinkKafkaConsumer010<String> consumer = 
     new FlinkKafkaConsumer010<>("test-topic", new SimpleStringSchema(), properties); 

    DataStream<Element> socketStockStream = 
     env.addSource(consumer).map(new MapFunction<String, Element>() { 
      @Override 
      public Element map(String value) throws Exception { 

      String split[] = value.split(","); 
      Element element = new Element(split[0], Long.parseLong(split[1])); 

      return element; 
      } 
     }).assignTimestampsAndWatermarks(new TimestampExtractor()); 

    socketStockStream.map(new MapFunction<Element, Tuple2<String, Integer>>() { 

     @Override 
     public Tuple2<String, Integer> map(Element value) throws Exception { 

     return new Tuple2<String, Integer>(value.getId(), 1); 
     } 
    }).keyBy(0).timeWindow(Time.seconds(10), Time.seconds(5)) 
    .sum(1). 
    print(); 

    env.execute(); 
    } 

    public static class TimestampExtractor implements AssignerWithPunctuatedWatermarks<Element> { 

    private static final long serialVersionUID = 1L; 

    @Override 
    public long extractTimestamp(Element element, long previousElementTimestamp) { 

     return element.getTimestamp(); 
    } 

    @Override 
    public Watermark checkAndGetNextWatermark(Element lastElement, long extractedTimestamp) { 
     // TODO Auto-generated method stub 
     return null; 
    } 
    } 
} 

答えて

4

イベントタイム処理には、適切に生成されたtimestamps and watermarksが必要です。

コードのTimestampExtractorは透かしを生成しませんが、常にnullを返します。

+0

Thnks Fabianは透かし割り当てでうまく動作します...しかし、透かしをnullに割り当てるとまだ興味がありますが、透かし(t)として何が起こっているかは、t '

+1

「ヌル」を返すと、ウォーターマークが更新されないことを意味します。したがって、ウォーターマークは常に「Long.MIN_VALUE」に留まり、進捗しません。つまり、ウィンドウを計算することはできません。 –

関連する問題