2017-08-02 8 views
0

私はkafkaストリーミングAPI(Kakfaバージョン:0.10.2.0)を使って簡単な単語の数を試しています。Wordcount App gistKafkaストリームの単語数アプリケーション

./kafka-console-producer.sh -topic input-topic --broker-list localhost:9092

./kafka-console-consumer.sh --topic output-topic --bootstrap-server localhost:9092 --from-beginning 
アプリケーションを起動

し、すべてが正常に動作しているようだが、私は、コンソールプロデューサー内の一部の文字列を入力する場合、消費者は全く何も受信しない:私は両方のプロデューサーとコンソール消費者を実行していますよ。私は入力の簡単なのtoUpperCaseを行うためのアプリケーションを変更した場合、消費者は、ストリーム(大文字に変更された)罰金を受信:

//The following code works fine: val uppercasedWithMapValues: KStream[String, String] = textLines.mapValues(_.toUpperCase()) uppercasedWithMapValues.to("output-topic")

を私はワードカウントの例では何も受けていないよ、なぜ誰もが知っています?コンシューマにシリアライザを指定する必要がありますか?

➜ bin ./kafka-console-consumer.sh \ 
      --topic output-topic \ 
      --bootstrap-server localhost:9092 \ 
      --from-beginning                     
[2017-08-02 07:48:20,187]WARN Error while fetching metadata with correlation id 2 : 
{output-topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) 
[2017-08-02 07:48:20,197] WARN The following subscribed topics are not assigned 
to any members in the group console-consumer-91651 : [output-topic] 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) 

^それので、7つのメッセージ

答えて

2

KStream作品の合計をCProcessed:私の最後のテストでは、コンソール、消費者は、私は、コンソールを介して送信されますが、出力下記参照、それらを示さなかったメッセージを処理しキャッシュを使用しません。 KTableの場合は少し待つか、cache.max.bytes.buffering0に設定する必要があります(プロダクションコードではありません)。

+0

恐ろしいです!それはトリックでした!どうもありがとうございました!私はカフカストリームの内部構造についてもっと読む必要があると思う。再びありがとう@Arek – ardlema

+0

私はあなたを助けてうれしい@ardlema :) – Arek