0

私はパターンの下に使用してFLINKに> 10あるすべての一時イベントをフィルタしようとしています、warnings.print()(最初の最後のイベント)逆の順序でイベントを出力しますが、ApacheのFLINK CEPで最初のイベントをexceptsで

Pattern<MonitoringEvent, ?> warningPattern = Pattern.<MonitoringEvent>begin("first") 
       .subtype(TemperatureEvent.class) 
       .where(new FilterFunction<TemperatureEvent>() { 
        @Override 
        public boolean filter(TemperatureEvent temperatureEvent) throws Exception { 
         return temperatureEvent.getTemperature() > 50; 
        } 
       }); 

入力は入力機能でストリーミングするために解析されるテキストファイルです、入力ファイルの内容は次のとおりです。 -

1,98 
2,33 
3,44 
4,55 
5,66 
6,88 
7,99 
8,76 

ここで最初の値はRack_idであり、第二は、私は、印刷()上を発行した温度

です入力ストリームとWarnigsStream両方

inputEventStream.print(); 
warnings.print(); 

以下のように今、問題が来る、我々が見ることができるようにFLINK CEPの出力は

08/10/2017 23:43:15 Job execution switched to status RUNNING. 
08/10/2017 23:43:15 Source: Custom Source -> Sink: Unnamed(1/1) switched to SCHEDULED 
08/10/2017 23:43:15 Source: Custom Source -> Sink: Unnamed(1/1) switched to DEPLOYING 
08/10/2017 23:43:15 AbstractCEPPatternOperator -> Map -> Sink: Unnamed(1/1) switched to SCHEDULED 
08/10/2017 23:43:15 AbstractCEPPatternOperator -> Map -> Sink: Unnamed(1/1) switched to DEPLOYING 
08/10/2017 23:43:15 AbstractCEPPatternOperator -> Map -> Sink: Unnamed(1/1) switched to RUNNING 
08/10/2017 23:43:15 Source: Custom Source -> Sink: Unnamed(1/1) switched to RUNNING 
Rack id = 1 and temprature = 98.0) 
Rack id = 2 and temprature = 33.0) 
Rack id = 3 and temprature = 44.0) 
Rack id = 4 and temprature = 55.0) 
Rack id = 5 and temprature = 66.0) 
Rack id = 6 and temprature = 88.0) 
Rack id = 7 and temprature = 99.0) 
Rack id = 8 and temprature = 76.0) 
08/10/2017 23:43:16 Source: Custom Source -> Sink: Unnamed(1/1) switched to FINISHED 
Rack id = 1 and temprature = 98.0) 
Rack id = 8 and temprature = 76.0) 
Rack id = 7 and temprature = 99.0) 
Rack id = 6 and temprature = 88.0) 
Rack id = 5 and temprature = 66.0) 
Rack id = 4 and temprature = 55.0) 
08/10/2017 23:43:16 AbstractCEPPatternOperator -> Map -> Sink: Unnamed(1/1) switched to FINISHED 
08/10/2017 23:43:16 Job execution switched to status FINISHED. 

Process finished with exit code 0 

の下に表示され、最初の複雑なイベント(ラックID = 1および温度= 98.0))が同じ順序で印刷されますが、その後、temp> 50の他のすべての複雑なイベントは、入力ストリームに対して逆の順序で印刷されます。この問題は、タイムスタンプと透かしを割り当てることによって解決された

答えて

0

予め

My questions are :- 

1. Any idea why events are getting printed in reverse order? 
2. Is there a custom way to print values{w/o using warnings.print()} of 
    warning stream, like can I print only temperature, rather than rack-id ? 

おかげで、生成された出力は

08/11/2017 00:45:09 Job execution switched to status RUNNING. 
    08/11/2017 00:45:09 Source: Custom Source -> Timestamps/Watermarks(1/1) switched to SCHEDULED 
    08/11/2017 00:45:09 Source: Custom Source -> Timestamps/Watermarks(1/1) switched to DEPLOYING 
    08/11/2017 00:45:09 AbstractCEPPatternOperator -> Map -> Sink: Unnamed(1/1) switched to SCHEDULED 
    08/11/2017 00:45:09 AbstractCEPPatternOperator -> Map -> Sink: Unnamed(1/1) switched to DEPLOYING 
    08/11/2017 00:45:09 Source: Custom Source -> Timestamps/Watermarks(1/1) switched to RUNNING 
    08/11/2017 00:45:09 AbstractCEPPatternOperator -> Map -> Sink: Unnamed(1/1) switched to RUNNING 
    Rack id = 1 and temprature = 98.0) 
    Rack id = 4 and temprature = 55.0) 
    Rack id = 5 and temprature = 66.0) 
    Rack id = 6 and temprature = 88.0) 
    Rack id = 7 and temprature = 99.0) 
    Rack id = 8 and temprature = 76.0) 
    08/11/2017 00:45:10 Source: Custom Source -> Timestamps/Watermarks(1/1) switched to FINISHED 
    08/11/2017 00:45:10 AbstractCEPPatternOperator -> Map -> Sink: Unnamed(1/1) switched to FINISHED 
    08/11/2017 00:45:10 Job execution switched to status FINISHED. 
下に示されている
// Input stream of monitoring events 
     DataStream<MonitoringEvent> inputEventStream = env 
       .addSource(new InputStreamAGenerator()).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>()); 

以下のように示される入力ストリームします

関連する問題