2017-03-06 10 views
1

kafka + sparkストリーミングを使用してアプリケーションを構築したい場合は、アプリはmutilpeトピックのデータを受け取ります。私は、トピック+メッセージを発するが、初めに、私はこのFUNCにfromOffsetsパラメータを渡す必要があります方法kafkaの最新のオフセットを取得する方法

def createDirectStream[ 
K: ClassTag, 
V: ClassTag, 
KD <: Decoder[K]: ClassTag, 
VD <: Decoder[V]: ClassTag, 
R: ClassTag] (
    ssc: StreamingContext, 
    kafkaParams: Map[String, String], 
    fromOffsets: Map[TopicAndPartition, Long], 
    messageHandler: MessageAndMetadata[K, V] => R 
) 

を使用するwnat。問題は、トピックの最新のオフセットがわからないということです。オフセットを関数に渡すことができるように、どうすればよいのでしょうか。カフカにはまだメッセージはありません。

答えて

0

カフカにメッセージがまだ存在しない場合は、各パーティションのオフセットあなたはオフセットなしで起動したい場合は0である、あなたはfromOffsets: Map[TopicAndPartition, Long]パラメータをとらないオーバーロードを使用することができます。

def createDirectStream[ 
    K: ClassTag, 
    V: ClassTag, 
    KD <: Decoder[K]: ClassTag, 
    VD <: Decoder[V]: ClassTag](
    ssc: StreamingContext, 
    kafkaParams: Map[String, String], 
    topics: Set[String] 
) 

あなたはストリーミングジョブの実行を開始するとき、あなたはHasOffsetRangesRDDをキャストすることによりオフセットを抽出することができます。

inputDStream.transform { 
    rdd => 
    val offsets = rdd.asInstanceOf[HasOffsetRanges] 
    // Save offsets 
} 
関連する問題