2016-10-26 5 views
0

kafkaとsaprkを正常に統合しました。 私はkafkaからsparkにストリーミングを送りたいと思います。今はsparkにストリームを送ることができます。このストリームをRDDに入れたいので、rddを作成するcreateRDD()関数を使います。 しかし、私はrddでカフカからのマシーゼーションしか得ていません。 kafka-spark CreateRDD()関数でoffsetRange()をどのように設定するか教えてください。カフカのトピックpartion用 pyspark kafka streamingpyspark rdd kafkaのoffsetRange()関数を設定

まずセットoffsetranges

pyspark.streaming.kafka.OffsetRange(topic, partition, fromOffset, untilOffset) 

Initialiを使用して:

答えて

0

だけのドキュメントによると、あなたのコードスニペット

// Import dependencies and create kafka params as in Create Direct Stream 

    val offsetRanges = Array(
     // topic, partition, inclusive starting offset, exclusive ending offset 
     OffsetRange("test", 0, 0, 100), 
     OffsetRange("test", 1, 0, 100) 
    ) 

    val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent) 

Spark Kafka Integration guide

に使用zationは、次のようになります。

fromOffset = 0 
untilOffset = 10 
partition = 0 
topic = 'topic' 
offset = OffsetRange(topic, partition, fromOffset, untilOffset) 
offsets = [offset] 

次に、あなたがあなたのRDD

kafkaRDD = KafkaUtils.createRDD(sc, kafkaParams, offsets) 
+0

ヤスを作成できるようになりますが、どのようにスパークのバージョンが何であるかをpyspark –

+0

でそれを使用するには、使用していますか? – FaigB

+0

私はスパーク1.5.1を使用しています –

関連する問題