2

Twitterのデータをtwitter apiでストリームするためにPythonスクリプトraw_tweets_stream.pyを作成しました。 twitterからのjsonデータは、以下のスクリプトを使用してkafkaプロデューサにピップされます。kafkaのコマンドラインを使ってjson tweetsイベントをKafkaのトピック/プロデューサーに送信できません


`python raw_tweets_stream.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list localhost:2181 --topic raw_json_tweets` 

raw_json_tweetsこれらのツイート用に作成されたカフカのトピックです。 Pythonスクリプトraw_tweets_stream.pyは正常に動作しますが、カフカプロデューサに送信中にエラーをスローします。私はHortonworks HDP 2.3.1サンドボックスを使用しています。私は動物園とカフカが始動したことを確認しました。


/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic raw_json_tweets

Topic:raw_json_tweets  PartitionCount:1  ReplicationFactor:1  Configs: 
      Topic: raw_json_tweets  Partition: 0 Leader: 0  Replicas: 0  Isr: 0 

エラー:

[2016-08-25 22:36:26,212] ERROR Failed to send requests for topics raw_json_tweets with correlation ids in [57,64] (kafka.producer.async.DefaultEventHandler) 
[2016-08-25 22:36:26,213] ERROR Error in handling batch of 131 events (kafka.producer.async.ProducerSendThread) 
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. 
     at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:91) 
     at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) 
     at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) 
     at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) 
     at scala.collection.immutable.Stream.foreach(Stream.scala:547) 
     at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) 
     at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) 
[2016-08-25 22:36:27,217] WARN Fetching topic metadata with correlation id 65 for topics [Set(json_tweets1)] from broker [BrokerEndPoint(0,localhost,2181)] failed (kafka.client.ClientUtils$) 
java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. 
     at kafka.utils.CoreUtils$.read(CoreUtils.scala:193) 
     at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) 
     at kafka.network.Receive$class.readCompletely(Transmission.scala:56) 
     at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) 
     at kafka.network.BlockingChannel.receive(BlockingChannel.scala:131) 
     at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:77) 
     at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74) 
     at kafka.producer.SyncProducer.send(SyncProducer.scala:115) 
     at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) 
     at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) 
     at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) 
     at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:188) 
     at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:152) 
     at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151) 
     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
     at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:151) 
     at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96) 
     at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73) 
     at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) 
     at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) 
     at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) 
     at scala.collection.immutable.Stream.foreach(Stream.scala:547) 
     at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) 
     at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) 

更新:ソリューション


  1. アンバリサービスに行き、Kafkaログディレクトリを/tmp/kafka-logsに変更しました。
  2. 元のスクリプトに正しいポートとホスト名を含めるように変更しました。

    python raw_tweets_stream.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list sandbox.hortonworks.com:6667 --topic raw_json_tweets

  3. イベントは、コンソールの消費者を使用してカフカトピックに送信されていることを確認しました。

    /usr/hdp/2.3.0.0-2557/kafka/bin/kafka-console-consumer.sh -zookeeper sandbox.hortonworks.com:2181 -topic raw_json_tweets -from-beginning

答えて

1

あなたが飼育係に--broker-list2181)を指しているようではなく、あなたがそのデフォルトポートAmbariに9092または6667あるカフカブローカーで指定する必要がありそうです。

+0

@Binary Nerdを指摘してくれてありがとう。私はkafkaブローカーを正しいポート9092で更新しましたが、まだエラーを投げます。ここではエラーの一部です - > [2016-08-26 13:24:12,718] ERRORトピック、トピック別にメッセージを照合できませんでした。トピックのトピックメタデータを取得しています[Set(raw_json_tweets)] from broker [ArrayBuffer(BrokerEndPoint (0、localhost、9092))] failed(kafka.producer.async.DefaultEventHandler).. java.nio.channels.ClosedChannelException – gkc123

+0

Hortonworksの文書によると、Ambariを使用している場合、デフォルトのポートは '6667'それを試してください - https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.2/bk_secure-kafka-ambari/content/ch_secure-kafka-config-options.html –

+0

あなたは正しいです。ホートンワークスの正しい港は6667です(アンバリサービスに行くことも確認されています)。正しいポート '6667'とフルホスト名' sandbox.hortonworks.com'を含むようにスクリプトを修正しました。魅力的に働きました。 'python raw_tweets_stream.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list sandbox.hortonworks.com:6667 --topic raw_json_tweets' – gkc123

関連する問題