Flickinkで、ウィンドウベースの平均(または私が定義した他の関数)を履歴イベントに基づいたストリームで計算したいので、ストリームはEvent-Times (時間ベースを処理していない):ときFlinkとEvent-Time-Basedストリームで平均値を計算する
ctx.collectWithTimestamp(Datapoint(instrument, bid, ask), time.getMillis)
しかし、私は計算(適用機能)を行うときには動作しません:
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
私は、摂取時にタイムスタンプを追加する方法を発見しました私はEventTimeなしで同じ方法でそれをやっています。私は設定しなければならないウォーターマークについて何かを読んだ:
val avg = stream
.keyBy("instrument")
.timeWindow(Time.seconds(10))
.apply((key: Tuple, window: TimeWindow, values: Iterable[Datapoint], out: Collector[Datapoint])=>{
val avg = values.map(_.val).sum/values.size
val dp = Datapoint(key.getField[String](0), avg)
out.collect(dp)
})
avg.print()
env.execute()
誰かに簡単なScalaの例がありますか?
よろしく、
アンドレアス
ありがとうございます! btw:Apache Flinkの良い本やチュートリアルを知っていますか? –
@AndreasVogler私はこの本があなたに役立つと思う:https://data-artisans.com/download-introduction-apache-flink-book –
http://training.data-artisans.com/の演習はすべきである役に立った –