2017-02-15 6 views
1

sparkがストリームを作成し、kafka 2-patitionトピックの1つのパーティションのみからメッセージを取得できる状況が発生しました。Spark Structured Streamは、Kafkaの1つのパーティションからのメッセージを取得します

マイトピック: C:\bigdata\kafka_2.11-0.10.1.1\bin\windows>kafka-topics --create --zookeeper localhost:2181 --partitions 2 --replication-factor 1 --topic test4

カフカプロデューサー:

public class KafkaFileProducer { 

// kafka producer 
Producer<String, String> producer; 

public KafkaFileProducer() { 

    // configs 
    Properties props = new Properties(); 
    props.put("bootstrap.servers", "localhost:9092"); 
    props.put("acks", "all"); 
    //props.put("group.id", "testgroup"); 
    props.put("batch.size", "16384"); 
    props.put("auto.commit.interval.ms", "1000"); 
    props.put("linger.ms", "0"); 
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    props.put("block.on.buffer.full", "true"); 

    // instantiate a producer 
    producer = new KafkaProducer<String, String>(props); 
} 

/** 
* @param filePath 
*/ 
public void sendFile(String filePath) { 
    FileInputStream fis; 
    BufferedReader br = null; 

    try { 
     fis = new FileInputStream(filePath); 

     //Construct BufferedReader from InputStreamReader 
     br = new BufferedReader(new InputStreamReader(fis)); 

     int count = 0; 

     String line = null; 
     while ((line = br.readLine()) != null) { 
      count ++; 
      // dont send the header 
      if (count > 1) { 
       producer.send(new ProducerRecord<String, String>("test4", count + "", line)); 
       Thread.sleep(10); 
      } 
     } 

     System.out.println("Sent " + count + " lines of data"); 
    } catch (Exception e) { 
     e.printStackTrace(); 
    }finally{ 
     try { 
      br.close(); 
     } catch (IOException e) { 
      e.printStackTrace(); 
     } 

     producer.close(); 
    } 
} 

}

スパークストラクチャード・ストリーム:

System.setProperty("hadoop.home.dir", "C:\\bigdata\\winutils"); 

    final SparkSession sparkSession = SparkSession.builder().appName("Spark Data Processing").master("local[2]").getOrCreate(); 

    // create kafka stream to get the lines 
    Dataset<Tuple2<String, String>> stream = sparkSession 
      .readStream() 
      .format("kafka") 
      .option("kafka.bootstrap.servers", "localhost:9092") 
      .option("subscribe", "test4") 
      .option("startingOffsets", "{\"test4\":{\"0\":-1,\"1\":-1}}") 
      .option("failOnDataLoss", "false") 
      .load().selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as(Encoders.tuple(Encoders.STRING(), Encoders.STRING())); 

    Dataset<String> lines = stream.map((MapFunction<Tuple2<String, String>, String>) (Tuple2<String, String> tuple) -> tuple._2, Encoders.STRING()); 
    Dataset<Row> result = lines.groupBy().count(); 
    // Start running the query that prints the running counts to the console 
    StreamingQuery query = result//.orderBy("callTimeBin") 
      .writeStream() 
      .outputMode("complete") 
      .format("console") 
      .start(); 


    // wait for the query to finish 
    try { 
     query.awaitTermination(); 
    } catch (StreamingQueryException e) { 
     e.printStackTrace(); 
    } 

私は100行を送信するためにプロデューサーを実行すると、ファイル、クエリは51行だけを返しました。私は、スパークのデバッグログを読み、以下のような何かに気づいた:

17/02/15 10:52:49 DEBUG StreamExecution: Execution stats: ExecutionStats(Map(),List(),Map(watermark -> 1970-01-01T00:00:00.000Z)) 
17/02/15 10:52:49 DEBUG StreamExecution: Starting Trigger Calculation 
17/02/15 10:52:49 DEBUG KafkaConsumer: Pausing partition test4-1 
17/02/15 10:52:49 DEBUG KafkaConsumer: Pausing partition test4-0 
17/02/15 10:52:49 DEBUG KafkaSource: Partitions assigned to consumer: [test4-1, test4-0]. Seeking to the end. 
17/02/15 10:52:49 DEBUG KafkaConsumer: Seeking to end of partition test4-1 
17/02/15 10:52:49 DEBUG KafkaConsumer: Seeking to end of partition test4-0 
17/02/15 10:52:49 DEBUG Fetcher: Resetting offset for partition test4-1 to latest offset. 
17/02/15 10:52:49 DEBUG Fetcher: **Fetched {timestamp=-1, offset=49} for partition test4-1 
17/02/15 10:52:49 DEBUG Fetcher: Resetting offset for partition test4-1 to earliest offset. 
17/02/15 10:52:49 DEBUG Fetcher: Fetched {timestamp=-1, offset=0} for partition test4-1** 
17/02/15 10:52:49 DEBUG Fetcher: Resetting offset for partition test4-0 to latest offset. 
17/02/15 10:52:49 DEBUG Fetcher: Fetched {timestamp=-1, offset=51} for partition test4-0 
17/02/15 10:52:49 DEBUG KafkaSource: Got latest offsets for partition : Map(test4-1 -> 0, test4-0 -> 51) 
17/02/15 10:52:49 DEBUG KafkaSource: GetOffset: ArrayBuffer((test4-0,51), (test4-1,0)) 
17/02/15 10:52:49 DEBUG StreamExecution: getOffset took 0 ms 
17/02/15 10:52:49 DEBUG StreamExecution: triggerExecution took 0 ms 

test4-1オフセットealiest常にリセットされ、なぜ私にはわかりません。

誰かがすべてのパーティションからすべてのメッセージを取得する方法を知っていれば、私は非常に感謝します。 おかげで、

答えて

4

0.10.1で知らカフカの問題があります*クライアント:https://issues.apache.org/jira/browse/KAFKA-4547

今あなたは回避策として0.10.0.1クライアントを使用することができます。それはKafka 0.10.1。*クラスターと話すことができます。

詳細はhttps://issues.apache.org/jira/browse/SPARK-18779を参照してください。

+0

ありがとうございます、今、0.10.0.1クライアントを使用して動作します。 – taniGroup

+0

私は同じ問題をあなたの答えに従って解決しました、ありがとう。 – Mekal

関連する問題