2016-06-23 5 views
3

をリバランスすることができませんでしたが、私は、トピックを作成し、私はそのトピック例外:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic streams-file-input 

bin/kafka-console-producer.sh --broker-list localhost:9092 --streams-file-input 
にいくつかのメッセージを公開するために、単純なプロデューサーを置きます

私はカフカの流れで、以下の簡単な例を実行していますし、私は私が扱うことができない奇妙な例外が発生しました

Properties props = new Properties(); 
      props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe"); 
      props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.3:9092"); 
      props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); 
      props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); 

      // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data 
      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 

      KStreamBuilder builder = new KStreamBuilder(); 

      builder.stream("streams-file-input").to("streams-pipe-output"); 

      KafkaStreams streams = new KafkaStreams(builder, props); 
      streams.start(); 

      // usually the stream application would be running forever, 
      // in this example we just let it run for some time and stop since the input data is finite. 
      Thread.sleep(5000L); 

      streams.close(); 

Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to rebalance 
      at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:299) 
      at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) 
     Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error while creating the state manager 
      at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:71) 
      at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:86) 
      at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550) 
      at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577) 
      at org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68) 
      at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123) 
      at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222) 
      at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232) 
      at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227) 
      at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) 
      at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) 
      at org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) 
      at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) 
      at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) 
      at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436) 
      at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422) 
      at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) 
      at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) 
      at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) 
      at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) 
      at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) 
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) 
      at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) 
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) 
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) 
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) 
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) 
      at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) 
      at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) 
      at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977) 
      at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) 
      at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295) 
      ... 1 more 
     Caused by: java.io.FileNotFoundException: C:\tmp\kafka-streams\my-streapplication\0_0\.lock (The system cannot find the path specified) 
      at java.io.RandomAccessFile.open0(Native Method) 
      at java.io.RandomAccessFile.open(RandomAccessFile.java:316) 
      at java.io.RandomAccessFile.<init>(RandomAccessFile.java:243) 
      at org.apache.kafka.streams.processor.internals.ProcessorStateManager.lockStateDirectory(ProcessorStateManager.java:125) 
      at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:93) 
      at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69) 

<dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka-streams</artifactId> 
     <version>0.10.0.0</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka-clients</artifactId> 
     <version>0.10.0.0</version> 
    </dependency> 

私はこの例外を何とかしました。私はUbuntuでvmwareのkafkaクラスタを実行しています(私が使用しているバージョンはkafka_2.11-0.10.0.0です)おそらく問題はRAM-Cpuですか?

答えて

4
Caused by: java.io.FileNotFoundException: C:\tmp\kafka-streams\my-streapplication\0_0\.lock (The system cannot find the path specified) 

あなたのアプリケーション状態の親ディレクトリC:\tmp\kafka-streamsはexsistではありません。デフォルトディレクトリはStreamConfigです。私はWindows上でなぜ失敗したのか分かりません。

StreamConfig.STATE_DIR_CONFIGを指定したディレクトリに設定できます。

+0

神はあなたを祝福!!!!!! –

1

おかげで、これは正しい修正です@Muyooします

 Properties props = new Properties(); 
     props.put(StreamsConfig.APPLICATION_ID_CONFIG,"my-stremapplication"); 
     props.put(StreamsConfig.STATE_DIR_CONFIG, "streams-pipe"); 
     props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.210:9092"); 
     props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); 
     props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());