2016-06-21 10 views
0

私の場合は、複数のトピックを複数のコンシューマにリンクする必要があります。私はトピックごとに消費者グループを設定したい。 kafka .netクライアントでは、コンシューマグループを動的に作成して、そのコンシューマグループとトピックをリンクできるようにする方法はありませんでした。私はkafka serverの設定またはZookeeperに変更する必要がある場合はkafka 0.9.0バージョンを使用していますか?複数のトピックの複数のコンシューマグループを作成する

答えて

0

"トピックごとにコンシューマグループを設定する"とはどういう意味かわかりません。新しいコンシューマ・グループを開始すると(つまり、対応するグループIDを持つコンシューマを起動すると)、コンシューマ・グループは(トピックに登録するだけで)コンシューマにするトピックを決定します。これは特別な設定ではないため、トピック購読では常にグループが動的に作成されます。

アップデート(.NETクライアント):

は、私は、.NETクライアントに精通していないです。しかし、Githubのページ(github.com/Jroland/kafka-net)によると、消費者グループはまだサポートされていないようだ。

ただし、ホワイトリストを使用して特定のパーティションのみを読み取ることができます。このように、手動で負荷を分散することができます

https://github.com/Jroland/kafka-net#consumer-1から:

何のホワイトリストが提供されていない場合は、すべてのパーティションは、私が建てられていた各パーティションのリーダー

+0

コンシューマ・グループを作成し、トピックまたはクラスまたはファンクションに割り当てる方法 –

+0

JavaDoc(短い例もあります)を読んでください:https://kafka.apache.org/0100/javadoc/index.html ?org/apache/kafka/clients/consumer/KafkaConsumer.html –

+0

しかし、私は.Netクライアントを使用しています。 –

0

のための1 KafkaConnectionを作成消費されます以下のリンクとしてMicrosoft .NET kafkaを使用した簡単なプロトタイプです。それがあなたの問題を解決しているかどうかは分かりません。

はしかし、私はhightlyサンプル

https://github.com/Microsoft/CSharpClient-for-Kafka

(例えばなど、オフセット、トピックグループを維持するための飼育係をサポートしている)、それはカフカネットよりもはるかに多くの機能が含まれているので、あなたがこのライブラリを使用することをお勧めしていますコード

これは、kafkaに10メッセージを送信し、コンシューマーがそれを取得したときにコンソールに出力します。

static void Main(string[] args) 
    { 
     Task.Factory.StartNew(() => 
     { 
      ConsumerConfiguration consumerConfig = new ConsumerConfiguration 
      { 
       AutoCommit = true, 
       AutoCommitInterval = 1000, 
       GroupId = "group1", 
       ConsumerId = "1", 
       AutoOffsetReset = OffsetRequest.SmallestTime, 
       NumberOfTries = 20, 
       ZooKeeper = new ZooKeeperConfiguration("localhost:2181", 30000, 30000, 2000)   
      }; 
      var consumer = new ZookeeperConsumerConnector(consumerConfig, true); 
      var dictionaryMapping = new Dictionary<string, int>(); 
      dictionaryMapping.Add("topic1", 1); 

      var streams = consumer.CreateMessageStreams(dictionaryMapping, new DefaultDecoder()); 

      var messageStream = streams["topic1"][0]; 

      foreach (var message in messageStream.GetCancellable(new CancellationToken())) 
      { 
       Console.WriteLine("Response: P{0},O{1} : {2}", message.PartitionId, message.Offset, Encoding.UTF8.GetString(message.Payload)); 

       //If you set AutoCommit to false, you can commit by yourself from this command. 
       //consumer.CommitOffsets()  
      } 
     }); 


     var brokerConfig = new BrokerConfiguration() 
     { 
      BrokerId = 1, 
      Host = "localhost", 
      Port = 9092 
     }; 
     var config = new ProducerConfiguration(new List<BrokerConfiguration> { brokerConfig }); 
     config.CompressionCodec = CompressionCodecs.DefaultCompressionCodec; 
     config.ProducerRetries = 3; 
     config.RequiredAcks = -1;    
     var kafkaProducer = new Producer(config); 

     byte[] payloadData = Encoding.UTF8.GetBytes("Test Message"); 
     var inputMessage = new Message(payloadData); 
     var data = new ProducerData<string, Message>("topic1", inputMessage); 

     for (int i = 0; i < 10; i++) 
     { 
      kafkaProducer.Send(data); 
     } 

     Console.ReadLine(); 
    } 

は、このヘルプを願っています。

関連する問題