2016-10-10 7 views
0

私はApache Flinkを初めて使用しており、FlinkストリーミングジョブをKafka側でスケーリングする方法に関するベストプラクティスを理解しようとしています。私は適切な答えを見つけることができないいくつかの質問が含まれています:Apache Flinkストリームスケーリング

  1. いくつのストリーミングジョブを実行する必要がありますか?ストリームが多すぎるとスケーラビリティの問題はありますか?どれくらいのものがいくらですか?
  2. ビジネスニーズを満たすために2,000ストリームを稼働させる場合、これらのストリームを管理する最良の方法は何ですか?
  3. あるストリームから別のストリームへストリームデータを読み取るための好ましい方法は何ですか?ストリームに参加したり、連続したクエリを実行したりできますか?

これらの質問が多少基本的だと思われる場合は、事前にお詫びし、お詫び申し上げますが、私はこの技術をよりよく扱おうとしています。私は多くのドキュメントを読んだことがありますが、この分野での私の経験不足のためにいくつかの概念をまとめているわけではないかもしれません。助けてくれてありがとう!

答えて

1

- >ストリームの数に制限はなく、ジョブマネージャ/タスクマネージャのメモリ/ CPU、並列化が使用されているかどうか、およびスロット数に応じて調整されます。私はYARNを使ってリソースを管理しています。接続されているストリームの数が多い場合は、プロセスの速度が低下するため、一部のタスクマネージャで処理の一部または全部が行われていないことに少し注意する必要があります。カフカストリームそのものに遅れがある場合や、負荷が重いタスクマネージャが間違いなく発生する可能性があり、そのために予防チェックを実行する必要があるため、内部遅れが発生する可能性があります。

- >最新のFlinkバージョンの一部としてContinuous Queriesサポートが構築されているため、Flinkのドキュメントを確認できます。

- >あるデータストリームを別のストリームに読み込むことによって、2つのストリームをフリンクの用語で接続することを意味する場合、それらを共通のキーに接続して値の状態を維持できます。値状態はタスクマネージャで維持され、タスクマネージャ間で共有されないことに注意してください。そうでない場合、2つ以上のストリームの和集合を意味する場合、フラットマップ関数を構築して、そのようなストリームからのデータが標準形式になるようにすることができます。組合の

例: ヴァル・ストリーム1:でDataStream [UserBookingEvent] = BookingClosure.getSource(ランモード).getSource(ENV) .MAP(新しいClosureMapFunction)

ヴァルのSTREAM2:でDataStream [UserBookingEvent] = BookingCancel.getSource (ランモード).getSource(ENV) .MAP(新しいCancelMapFunction)

ヴァルunionStream:でDataStream [UserBookingEvent] = stream1.union(ストリーム2)

import org.apache.flink.api.common.functions.MapFunction 
import org.json4s.jackson.JsonMethods.{parse => _, parseOpt => _} 
import org.json4s.native.JsonMethods._ 
import org.slf4j.{Logger, LoggerFactory} 

class CancelMapFunction extends MapFunction[String, Option[UserBookingEvent]] { 
    override def map(in: String): Option[UserBookingEvent] = { 
    val LOG: Logger = LoggerFactory.getLogger(classOf[CancelMapFunction]) 
    try { 
     implicit lazy val formats = org.json4s.DefaultFormats 

     val json = parse(in) 
     .............. 
    } catch { 
     case e: Exception => { 
     LOG.error("Could not parse Cancel Event= " + in + e.getMessage) 
     None 
     } 
    } 
関連する問題