これはかなり簡潔で、いくつかの注意点があります。まず、これがカフカ側からどのように作用するかを理解するのに役立ちます。
カフカは、オフセットと呼ばれるものを管理します。カフカの各メッセージには、パーティション内の位置に対するオフセットがあります。パーティションの最初のメッセージは、オフセットが0L
であり、2つ目が1L
などです。それ以外は、ログのロールオーバーとトピック圧縮のため、0L
は必ずしも最も古いオフセットではありません。パーティション。
最初に行う必要があるのは、最初から読み込みたいすべてのパーティションのオフセットを収集することです。ここでは、これを行う機能があります:
def getOffsets(consumer: SimpleConsumer, topic: String, partition: Int) : (Long,Long) = {
val time = kafka.api.OffsetRequest.LatestTime
val reqInfo = Map[TopicAndPartition,PartitionOffsetRequestInfo](
(new TopicAndPartition(topic, partition)) -> (new PartitionOffsetRequestInfo(time, 1000))
)
val req = new kafka.javaapi.OffsetRequest(
reqInfo, kafka.api.OffsetRequest.CurrentVersion, "test"
)
val resp = consumer.getOffsetsBefore(req)
val offsets = resp.offsets(topic, partition)
(offsets(offsets.size - 1), offsets(0))
}
あなたはこのようにそれを呼び出します。あなたはカフカからのオフセットを取得について知りたいことすべて、read thisについては
val (firstOffset,nextOffset) = getOffsets(consumer, "MyTopicName", 0)
。最小限に抑えるためにはわかりません。 (あなたが完全例えばPartitionOffsetRequestInfo
の二番目の引数を、理解したときに私に教えてください。)
今、あなたはあなたが歴史的に見てみたいパーティションのfirstOffset
とlastOffset
を持っていることを、あなたはそれからである、createDirectStream
のfromOffset
パラメータを使用しますタイプ:fromOffset: Map[TopicAndPartition, Long]
。 getOffsets()
から得たfirstOffset
にLong
/値を設定します。
nextOffset
については、これを使用して、履歴データの処理から新しいデータへの移行時にストリーム内で判断できます。 msg.offset == nextOffset
の場合、パーティション内の最初の非履歴レコードを処理しています。注意事項については、今
、直接from the documentation:コンテキストが開始された後
- 、新たなストリーミング計算は を設定したり、それに追加することができます。
- コンテキストが停止すると、 を再起動できません。
- 同じ時刻に のJVMでアクティブにできるStreamingContextは1つだけです。
- StreamingContextのstop()もSparkContextを停止します。 にStreamingContextのみを停止するには、stopSparkContextというオプションのstop() のパラメータをfalseに設定します。
- 次のStreamingContextが作成される前に、 StreamingContextが(SparkContextを停止せずに)停止されている限り、SparkContextを に再作成して複数のStreamingContextを作成することができます。
それはので、私はfirstOffset
と同時にnextOffset
をつかむこれらの警告のだ - 私は、ストリームのアップを維持するが、それに歴史的提示時間処理からコンテキストを変更することができます。
詳細な回答ありがとうございます。私はそれを読んで、それをよく理解するためにそれを再び読むでしょう。一方、Sparkの「スライディングウィンドウ」を使用して時間をカウントします。どうすればこれを達成できますか? – yolgun