2017-02-23 28 views
1

私はストリーム処理とCEPをKafkaメッセージストリームで実行しようとしています。このために、最初にプロトタイプを実現するためにApache Igniteを選びました。しかし、私はキューに接続することはできません。Apache Ignite Kafka接続の問題

使用 kafka_2.11-0.10.1.0 のapache-発火ファブリック-1.8.0-binの

ビン/ zookeeper-server-start.shのconfig /飼育係。特性 ビン/ kafka-server-start.sh設定/ server.properties ビン/ kafka-topics.sh --create --zookeeperはlocalhost:2181 --replication因子1 --partitions 1 --topic試験

カフカは正常に動作し、私は消費者でテストしました。 その後、私は着火を開始し、次に私は春のブートコマンドラインアプリで次のように実行します。

KafkaStreamer<String, String, String> kafkaStreamer = new KafkaStreamer<>(); 

    Ignition.setClientMode(true); 

    Ignite ignite = Ignition.start(); 

    Properties settings = new Properties(); 
    // Set a few key parameters 
    settings.put("bootstrap.servers", "localhost:9092"); 
    settings.put("group.id", "test"); 
    settings.put("zookeeper.connect", "localhost:2181"); 
    settings.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    settings.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    settings.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    settings.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 

    // Create an instance of StreamsConfig from the Properties instance 
    kafka.consumer.ConsumerConfig config = new ConsumerConfig(settings); 

    IgniteCache<String, String> cache = ignite.getOrCreateCache("myCache"); 

    try (IgniteDataStreamer<String, String> stmr = ignite.dataStreamer("myCache")) { 
     // allow overwriting cache data 
     stmr.allowOverwrite(true); 

     kafkaStreamer.setIgnite(ignite); 
     kafkaStreamer.setStreamer(stmr); 

     // set the topic 
     kafkaStreamer.setTopic("test"); 

     // set the number of threads to process Kafka streams 
     kafkaStreamer.setThreads(1); 

     // set Kafka consumer configurations 
     kafkaStreamer.setConsumerConfig(config); 

     // set decoders 
     StringDecoder keyDecoder = new StringDecoder(null); 
     StringDecoder valueDecoder = new StringDecoder(null); 

     kafkaStreamer.setKeyDecoder(keyDecoder); 
     kafkaStreamer.setValueDecoder(valueDecoder); 

     kafkaStreamer.start(); 
    } finally { 
     kafkaStreamer.stop(); 
    } 

アプリケーションの起動時には、私が

2017年2月23日10得る:25:23.409は1388をWARN --- [メイン] kafka.utils.VerifiableProperties:プロパティbootstrap.serversは有効な 2017ではありません-02-23 10:25:23.410 INFO 1388 --- [main] kafka.utils.VerifiableProperties:プロパティgroup.idがテストにオーバーライドされます 2017-02-23 10:25:23.410 WARN 1388 --- [main] kafka.utils.VerifiableProperties:プロパティkey.deserializerが無効です 2017-02-23 10:25:23.411 WARN 1388 --- [main] kafka.utils.VerifiableProperties:プロパティkey.serializerが無効です 2017-0 2-23 10:25:23.411 WARN 1388 --- [main] kafka.utils.VerifiableProperties:プロパティvalue.deserializerが無効です 2017-02-23 10:25:23.411 WARN 1388 --- [main] kafka。 utils.VerifiableProperties:プロパティvalue.serializerが有効でない 2017年2月23日10:25:23.411 INFO 1388 --- [メイン] kafka.utils.VerifiableProperties:プロパティzookeeper.connectがローカルホストに上書きされます:2181

次に、

2017-02-23 10:25:24.057 WARN 1388 --- [r-finder-thread] kafka.client.ClientUtils $:トピックの相関ID 0のトピックメタデータを取得しています[Set(test)] fromブローカー[BrokerEndPoint(0、user.local、9092)]が失敗しました

java.nio.channels.ClosedChannelException:null at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)〜[kafka_2.11-0.10.0.1.jar:na] at kafka.producer.SyncProducer.liftedTree1 $ 1(SyncProducer.scala:80)〜[kafka_2.11-0.10.0.1.jar:na] at kafka.producer.SyncProducer.kafka $プロデューサー$ SyncProducer $$ doSend(SyncProducer.scala:79)〜[kafka_2.11 -0.10.0.1.jar:na] at kafka.producer.SyncProducer.send(SyncProducer.scala:124)〜[kafka_2.11-0.10.0.1.jar:na] kafka.client.ClientUtils $ .fetchTopicMetadata( ClientUtils.scala:59)[kafka_2.11-0.10.0.1.jar:na] kafka.client.ClientUtils $ .fetchTopicMetadata(ClientUtils.scala:94)[kafka_2.11-0.10.0.1.jar:na] コンストラクターkafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)[kafka_2.11-0.10.0.1.jar:na]

キューからの読み取りは機能しません。 これを解決する方法を知っている人はいますか?

編集:私は最終的に、ブロック、次のエラーの内容をコメントする場合

[2m2017-02-27午後04時42分27秒をしています。7m-1] [39m [36m、39m [2m: [0; 39mメッセージはエラーのために無視されます。[msg = MessageAndMetadata(test、0、Message(magic = 1、属性= 0、CreateTime = -1、crc = 2558126716、key = java.nio.HeapByteBuffer [pos = 0 1つのキャップ= 79]、payload = java.nio.HeapByteBuffer [pos = 0 lim = 74 cap = 74])、15941704、kafka.serializer.StringDecoder @ 74a96647、kafka.serializer.StringDecoder @ 42849d34、-1、CreateTime )]

java.lang.IllegalStateException:データストリーマーが閉じられました。 at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.enterBusy(DataStreamerImpl.java:401)〜[ignite-core-1.8.0.jar:1.8.0] at org.apache.ignite.internal。 (DataStreamerImpl.addDataInternal(DataStreamerImpl.java:613)〜[ignite-core-1.8.0.jar:1.8.0] at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addData(DataStreamerImpl。 java:667)〜[ignite-core-1.8.0.jar:1.8.0] at org.apache.ignite.stream.kafka.KafkaStreamer $ 1.run(KafkaStreamer.java:180)〜[ignite-kafka-1.8 .0.jar:1.8.0] at java.util.concurrent.Executors $ RunnableAdapter.call(Executors.java:511)[na:1.8.0_111] at java.util.concurrent.FutureTask.run(FutureTask。 java:266)[na:1.8.0_111] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.j (スレッド:1142)[na:1.8.0_111] at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617)[na:1.8.0_111] でjava.lang.Thread.run(スレッド。 java:745)[na:1.8.0_111]

ありがとう!

答えて

1

これは、KafkaStreamerが起動した直後に閉鎖されているためです(kafkaStreamer.stop()コールfinallyブロック)。 kafkaStreamer.start()は非同期で、カフカから消費するスレッドをスピンアウトして終了します。

+0

答えていただきありがとうございます。 "finally"ブロックの内容にコメントすると、上に投稿したエラーが発生します。 – razvan

+0

これは 'IgniteDataStreamer'も閉じているためです。 try-with-resourcesブロックを取り除くと動作します。 –

+0

こんにちは、私は(キャッシュからの読み方がわからないので)まだ実行するアプリケーションを取得しませんでしたが、少なくとも私はエラーをもう受け取りません。だから私はこの質問に答えてマークし、残りのために新しい質問をオープンします。もう一度おねがいします。多分あなたも見ることができますhttps://stackoverflow.com/questions/42562766/how-to-properly-read-from-ignite-cache – razvan