- >ストリームの数に制限はなく、ジョブマネージャ/タスクマネージャのメモリ/ CPU、並列化が使用されているかどうか、およびスロット数に応じて調整されます。私はYARNを使ってリソースを管理しています。接続されているストリームの数が多い場合は、プロセスの速度が低下するため、一部のタスクマネージャで処理の一部または全部が行われていないことに少し注意する必要があります。カフカストリームそのものに遅れがある場合や、負荷が重いタスクマネージャが間違いなく発生する可能性があり、そのために予防チェックを実行する必要があるため、内部遅れが発生する可能性があります。
- >最新のFlinkバージョンの一部としてContinuous Queriesサポートが構築されているため、Flinkのドキュメントを確認できます。
- >あるデータストリームを別のストリームに読み込むことによって、2つのストリームをフリンクの用語で接続することを意味する場合、それらを共通のキーに接続して値の状態を維持できます。値状態はタスクマネージャで維持され、タスクマネージャ間で共有されないことに注意してください。そうでない場合、2つ以上のストリームの和集合を意味する場合、フラットマップ関数を構築して、そのようなストリームからのデータが標準形式になるようにすることができます。組合の
例: ヴァル・ストリーム1:でDataStream [UserBookingEvent] = BookingClosure.getSource(ランモード).getSource(ENV) .MAP(新しいClosureMapFunction)
ヴァルのSTREAM2:でDataStream [UserBookingEvent] = BookingCancel.getSource (ランモード).getSource(ENV) .MAP(新しいCancelMapFunction)
ヴァルunionStream:でDataStream [UserBookingEvent] = stream1.union(ストリーム2)
import org.apache.flink.api.common.functions.MapFunction
import org.json4s.jackson.JsonMethods.{parse => _, parseOpt => _}
import org.json4s.native.JsonMethods._
import org.slf4j.{Logger, LoggerFactory}
class CancelMapFunction extends MapFunction[String, Option[UserBookingEvent]] {
override def map(in: String): Option[UserBookingEvent] = {
val LOG: Logger = LoggerFactory.getLogger(classOf[CancelMapFunction])
try {
implicit lazy val formats = org.json4s.DefaultFormats
val json = parse(in)
..............
} catch {
case e: Exception => {
LOG.error("Could not parse Cancel Event= " + in + e.getMessage)
None
}
}