2016-05-25 4 views
1

...Kafka Consumer APIのキーを使用してデータを読み取る方法は?私は賢明なデータのトピックを読んでいるので、私はコードの下に使用してメッセージを構築しています

Producer<String, String> producer = new kafka.javaapi.producer.Producer<String, String>(producerConfig); 
KeyedMessage<String, String> keyedMsg = new KeyedMessage<String, String>(topic, "device-420", "{message:'hello world'}");   
producer.send(keyedMsg); 

し、次のコードブロックを使用して消費...

//Key = topic name, Value = No. of threads for topic 
Map<String, Integer> topicCount = new HashMap<String, Integer>();  
topicCount.put(topic, 1); 

//ConsumerConnector creates the message stream for each topic 
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumerConnector.createMessageStreams(topicCount);   

// Get Kafka stream for topic 
List<KafkaStream<byte[], byte[]>> kStreamList = consumerStreams.get(topic); 

// Iterate stream using ConsumerIterator 
for (final KafkaStream<byte[], byte[]> kStreams : kStreamList) { 
    ConsumerIterator<byte[], byte[]> consumerIte = kStreams.iterator();   
    while (consumerIte.hasNext()) {    
     MessageAndMetadata<byte[], byte[]> msg = consumerIte.next();    
     System.out.println(topic.toUpperCase() + ">" 
       + " Partition:" + msg.partition() 
       + " | Key:"+ new String(msg.key()) 
       + " | Offset:" + msg.offset() 
       + " | Message:"+ new String(msg.message())); 
    } 
} 

すべてが正常に動作しています。メッセージを使用してデータを使用する方法はありますか?device-420この例では、

答えて

1

短い回答:いいえ。

カフカの最小単位は、パーティションです。単一のパーティションからのみ読み取るクライアントを作成できます。ただし、パーティションには複数のキーを含めることができ、このパーティションに含まれるすべてのキーを使用する必要があります。

+0

ありがとうございました!あなたは私の時間を救った。しかし、どのようにしてデータパーティションにアクセスできますか?あなたはポストコードをお願いできますか? – Khan

+0

あなたが使用しているカフカのバージョンに依存します。ここをクリック0.8:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example –

関連する問題