2016-12-28 6 views
1

Scala 2.10.6とSpark 1.6.2を使用して、カフカトピックからのメッセージを消費したいと思います。カフカのために私はこの依存関係を使用しています:ScalaのKafkaコンシューマをコンパイルできません

<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-streaming_2.10</artifactId> 
    <version>1.6.2</version> 
</dependency> 

をこのコードは罰金コンパイル、しかし私はauto.offset.resetを定義したいとここで問題が発生:

val topicMap = topic.split(",").map((_, kafkaNumThreads.toInt)).toMap 
val data = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, 
          StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2) 

私はkafkaParamsを追加すると、それはもうコンパイルされません。

val kafkaParams = Map[String, String](
"zookeeper.connect" -> zkQuorum, "group.id" -> group, 
"zookeeper.connection.timeout.ms" -> "10000", 
"auto.offset.reset" -> "smallest") 

val data = KafkaUtils.createStream(ssc, kafkaParams, topicMap, 
           StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2) 

エラーメッセージ:

94: error: missing parameter type for expanded function ((x$3) => x$3._2) 
[ERROR]             StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2) 

私はcreateStreamのパラメータの多くの異なる組み合わせを試しましたが、すべてが失敗します。誰か助けてもらえますか?

答えて

1

タイプパラメータをKafkaUtils.createStreamに追加すると、ストリームの基本タイプを解決できます。たとえば、キーと値のタイプがStringの場合:

val data: DStream[String] = 
    KafkaUtils 
    .createStream[String, String, StringDecoder, StringDecoder](
     ssc, 
     kafkaParams, 
     topicMap, 
     StorageLevel.MEMORY_AND_DISK_SER_2 
).map(_._2) 
+0

[OK]をクリックします。確かに問題は、私がリモートのKafkaキューからメッセージを消費したいということです。私はそれらを 'curl'コマンドとコンフルエントなAPIを使って端末から得ることができます。しかし、私がScalaコードを実行すると、私はそれらを取得しません。だから、私の前提はオフセットを指定する必要があるということです。 – Dinosaurius

+0

@Dinosauriusあなたの質問のエラーは、オフセットとは関係ありません。これは単にコンパイラが正しい型を推論できないことです。 –

+0

ええ、私は知っています。オフセットを設定する必要がある理由を説明したかっただけです。 – Dinosaurius

関連する問題