2017-12-03 3 views
0

Flickinkで、ウィンドウベースの平均(または私が定義した他の関数)を履歴イベントに基づいたストリームで計算したいので、ストリームはEvent-Times (時間ベースを処理していない):ときFlinkとEvent-Time-Basedストリームで平均値を計算する

ctx.collectWithTimestamp(Datapoint(instrument, bid, ask), time.getMillis) 

しかし、私は計算(適用機能)を行うときには動作しません:

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment 
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 

私は、摂取時にタイムスタンプを追加する方法を発見しました私はEventTimeなしで同じ方法でそれをやっています。私は設定しなければならないウォーターマークについて何かを読んだ:

val avg = stream 
    .keyBy("instrument") 
    .timeWindow(Time.seconds(10)) 
    .apply((key: Tuple, window: TimeWindow, values: Iterable[Datapoint], out: Collector[Datapoint])=>{ 
    val avg = values.map(_.val).sum/values.size 
    val dp = Datapoint(key.getField[String](0), avg) 
    out.collect(dp) 
    }) 

avg.print() 
env.execute() 

誰かに簡単なScalaの例がありますか?

よろしく、
アンドレアス

答えて

0

透かしは、以前のタイムスタンプを持つすべてのイベントは、(おそらく)がすでに到着しているアサーションと効果的にタイムスタンプです。イベント時間に基づくWindowsは、ウィンドウが完了したときを知るウォーターマークに依存します。これまでの最も一般的な電子透かし戦略は、事象が何らかの制限された遅延で到着すると想定することである。

あなたは(摂取時)のデータソースに透かしを発するようにしたい場合は、Source Functions with Timestamps and Watermarksを参照してください、しかし一方で、あなたはこの外に対処したい、場合には、

ctx.emitWatermark(new Watermark(datapoint.getWatermarkTime)) 

ような単純なものでしょう出典:Timestamp Assigners/Watermark GeneratorsおよびAssigners allowing a fixed amount of latenessを参照してください。

stream 
    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Datapoint](Time.seconds(10))(_.getTimestamp)) 
    .keyBy("instrument") 
    ... 

私がリンクしたドキュメントには、Scalaのより詳細な例があります。

+0

ありがとうございます! btw:Apache Flinkの良い本やチュートリアルを知っていますか? –

+0

@AndreasVogler私はこの本があなたに役立つと思う:https://data-artisans.com/download-introduction-apache-flink-book –

+0

http://training.data-artisans.com/の演習はすべきである役に立った –