2017-02-26 3 views
3

私はkafkaキューからデータを読み込んでいるスタンドアロンのスパーククラスタを持っています。 kafkaキューには5つのパーティションがあり、スパークはパーティションの1つのデータのみを処理しています。私は、次を使用しています:ここでKafka - Spark Streaming - 1つのパーティションからのデータの読み出し

は私のMavenの依存関係です:

<dependencies> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> 
     <version>2.0.2</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_2.11</artifactId> 
     <version>2.0.2</version> 
    </dependency> 
    <dependency> 
     <groupId>kafka-custom</groupId> 
     <artifactId>kafka-clients</artifactId> 
     <version>0.10.1.1</version> 
    </dependency> 

マイカフカのプロデューサーは単にキューに100件のメッセージを入れている単純なプロデューサである:ここでは

public void generateMessages() { 

    // Define the properties for the Kafka Connection 
    Properties props = new Properties(); 
    props.put("bootstrap.servers", kafkaBrokerServer); // kafka server 
    props.put("acks", "all"); 
    props.put("retries", 0); 
    props.put("batch.size", 16384); 
    props.put("linger.ms", 1); 
    props.put("buffer.memory", 33554432); 
    props.put("key.serializer", 
      "org.apache.kafka.common.serialization.StringSerializer"); 
    props.put("value.serializer", 
      "org.apache.kafka.common.serialization.StringSerializer"); 

    // Create a KafkaProducer using the Kafka Connection properties 
    KafkaProducer<String, String> producer = new KafkaProducer<String, String>(
      props); 
    for (int i = 0; i < 100; i++) { 
     ProducerRecord<String, String> record = new ProducerRecord<>(kafkaTopic, "value-" + i); 
     producer.send(record); 
    } 
    producer.close(); 

} 

です私のスパークストリーミングジョブのメインコード:

public void processKafka() throws InterruptedException { 
    LOG.info("************ SparkStreamingKafka.processKafka start"); 

    // Create the spark application 
    SparkConf sparkConf = new SparkConf(); 
    sparkConf.set("spark.executor.cores", "5"); 

    //To express any Spark Streaming computation, a StreamingContext object needs to be created. 
    //This object serves as the main entry point for all Spark Streaming functionality. 
    //This creates the spark streaming context with a 'numSeconds' second batch size 
    jssc = new JavaStreamingContext(sparkConf, Durations.seconds(sparkBatchInterval)); 


    //List of parameters 
    Map<String, Object> kafkaParams = new HashMap<>(); 
    kafkaParams.put("bootstrap.servers", this.getBrokerList()); 
    kafkaParams.put("client.id", "SpliceSpark"); 
    kafkaParams.put("group.id", "mynewgroup"); 
    kafkaParams.put("auto.offset.reset", "earliest"); 
    kafkaParams.put("enable.auto.commit", false); 
    kafkaParams.put("key.deserializer", StringDeserializer.class); 
    kafkaParams.put("value.deserializer", StringDeserializer.class); 

    List<TopicPartition> topicPartitions= new ArrayList<TopicPartition>(); 
    for(int i=0; i<5; i++) { 
     topicPartitions.add(new TopicPartition("mytopic", i)); 
    } 


    //List of kafka topics to process 
    Collection<String> topics = Arrays.asList(this.getTopicList().split(",")); 


    JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
      jssc, 
      LocationStrategies.PreferConsistent(), 
      ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams) 
     ); 

    //Another version of an attempt 
    /* 
    JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
     jssc, 
     LocationStrategies.PreferConsistent(), 
     ConsumerStrategies.<String, String>Assign(topicPartitions, kafkaParams) 
    ); 
    */ 

    messages.foreachRDD(new PrintRDDDetails()); 


    // Start running the job to receive and transform the data 
    jssc.start(); 

    //Allows the current thread to wait for the termination of the context by stop() or by an exception 
    jssc.awaitTermination(); 
} 

PrintRDDDetailsの呼び出し方法には次のようになります。

public void call(JavaRDD<ConsumerRecord<String, String>> rdd) 
     throws Exception { 

    LOG.error("--- New RDD with " + rdd.partitions().size() 
      + " partitions and " + rdd.count() + " records"); 

} 

1つのパーティションからデータが取得されるだけです。私はカフカで5つのパーティションがあることを確認しました。コールメソッドが実行されると、適切な数のパーティションが出力されますが、1つのパーティションにあるレコードだけが出力されます。この単純なコードから取り出した処理は、1つのパーティションのみを処理していることを示します。

答えて

4

これは、それが(以下プル要求に従って)カフカ-クライアントライブラリのv0.10.1を使っているようスパーク2.1.0に問題があるようです:

https://github.com/apache/spark/pull/16278

私が使用してこれを中心に働いていました新しいバージョンのkafka-clientsライブラリ:

libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core"     % sparkVersion, 
    "org.apache.spark" %% "spark-streaming"    % sparkVersion, 
    "org.apache.spark" %% "spark-sql"     % sparkVersion, 
    "org.apache.spark" %% "spark-streaming-kinesis-asl" % sparkVersion, 
    "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % sparkVersion, 
).map(_.exclude("org.apache.kafka", "kafka-clients")) 

libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.2.0" 
+0

ありがとうございます - 私はそれを試してみましょう。 – Erin

+0

Param - うまくいった!どうもありがとうございました。 – Erin

+0

恐ろしい!私は嬉しいです:) – Param

関連する問題