私の場合は、複数のトピックを複数のコンシューマにリンクする必要があります。私はトピックごとに消費者グループを設定したい。 kafka
.netクライアントでは、コンシューマグループを動的に作成して、そのコンシューマグループとトピックをリンクできるようにする方法はありませんでした。私はkafka server
の設定またはZookeeper
に変更する必要がある場合はkafka 0.9.0
バージョンを使用していますか?複数のトピックの複数のコンシューマグループを作成する
答えて
"トピックごとにコンシューマグループを設定する"とはどういう意味かわかりません。新しいコンシューマ・グループを開始すると(つまり、対応するグループIDを持つコンシューマを起動すると)、コンシューマ・グループは(トピックに登録するだけで)コンシューマにするトピックを決定します。これは特別な設定ではないため、トピック購読では常にグループが動的に作成されます。
アップデート(.NETクライアント):
は、私は、.NETクライアントに精通していないです。しかし、Githubのページ(github.com/Jroland/kafka-net)によると、消費者グループはまだサポートされていないようだ。
ただし、ホワイトリストを使用して特定のパーティションのみを読み取ることができます。このように、手動で負荷を分散することができます
をhttps://github.com/Jroland/kafka-net#consumer-1から:
何のホワイトリストが提供されていない場合は、すべてのパーティションは、私が建てられていた各パーティションのリーダー
のための1 KafkaConnectionを作成消費されます以下のリンクとしてMicrosoft .NET kafkaを使用した簡単なプロトタイプです。それがあなたの問題を解決しているかどうかは分かりません。
はしかし、私はhightlyサンプル
https://github.com/Microsoft/CSharpClient-for-Kafka
(例えばなど、オフセット、トピックグループを維持するための飼育係をサポートしている)、それはカフカネットよりもはるかに多くの機能が含まれているので、あなたがこのライブラリを使用することをお勧めしていますコード
これは、kafkaに10メッセージを送信し、コンシューマーがそれを取得したときにコンソールに出力します。
static void Main(string[] args)
{
Task.Factory.StartNew(() =>
{
ConsumerConfiguration consumerConfig = new ConsumerConfiguration
{
AutoCommit = true,
AutoCommitInterval = 1000,
GroupId = "group1",
ConsumerId = "1",
AutoOffsetReset = OffsetRequest.SmallestTime,
NumberOfTries = 20,
ZooKeeper = new ZooKeeperConfiguration("localhost:2181", 30000, 30000, 2000)
};
var consumer = new ZookeeperConsumerConnector(consumerConfig, true);
var dictionaryMapping = new Dictionary<string, int>();
dictionaryMapping.Add("topic1", 1);
var streams = consumer.CreateMessageStreams(dictionaryMapping, new DefaultDecoder());
var messageStream = streams["topic1"][0];
foreach (var message in messageStream.GetCancellable(new CancellationToken()))
{
Console.WriteLine("Response: P{0},O{1} : {2}", message.PartitionId, message.Offset, Encoding.UTF8.GetString(message.Payload));
//If you set AutoCommit to false, you can commit by yourself from this command.
//consumer.CommitOffsets()
}
});
var brokerConfig = new BrokerConfiguration()
{
BrokerId = 1,
Host = "localhost",
Port = 9092
};
var config = new ProducerConfiguration(new List<BrokerConfiguration> { brokerConfig });
config.CompressionCodec = CompressionCodecs.DefaultCompressionCodec;
config.ProducerRetries = 3;
config.RequiredAcks = -1;
var kafkaProducer = new Producer(config);
byte[] payloadData = Encoding.UTF8.GetBytes("Test Message");
var inputMessage = new Message(payloadData);
var data = new ProducerData<string, Message>("topic1", inputMessage);
for (int i = 0; i < 10; i++)
{
kafkaProducer.Send(data);
}
Console.ReadLine();
}
は、このヘルプを願っています。
- 1. 複数のトピックへのストリーミングメッセージ
- 2. Neo4jの複数のプロパティに複数のインデックスを作成する
- 3. ハイブ:複数のファイルを複数のディレクトリに作成する
- 4. 複数のディレクトリから複数のtar.gzアーカイブを作成する
- 5. 単一のプロデューサから複数のコンシューマ(同じコンシューマグループ)
- 6. debugging:複数の列(dplyr)の複数のラグを作成する関数
- 7. 複数のフォームインスタンスを作成する
- 8. 複数行のテキストエリアを作成する
- 9. 複数のfb.apiコールを作成する
- 10. C++複数のソケットクライアントを作成する
- 11. 複数のチェックボックスを作成するPHP
- 12. 複数のウィンドウを作成する
- 13. 複数のログインページを作成する
- 14. 複数のmysqlスクリプトを作成する
- 15. 複数のベクトルを作成するr
- 16. 複数のテンポラリテーブルを作成するmySQL
- 17. 複数のオブジェクトを作成するグラフィック
- 18. ビジュアルスタジオソリューションテンプレートを作成する - 複数のプロジェクト
- 19. 複数のantrunタスクを作成する
- 20. 複数のビューを作成する
- 21. Three.js複数のオブジェクトを作成する
- 22. トルネード複数のプロセス:複数のMySQL接続を作成
- 23. モデル作成時に複数のレールが複数ある場合
- 24. RabbitMQ - トピック交換 - 複数の同じトピック同じサブ科目
- 25. 複数の同一のカフカストリームのトピックをマージする
- 26. 複数列対複数列のインデックス作成のコンセプト
- 27. ワードプレスで複数のレイアウトを持つ複数のページを作成するには
- 28. 単一のカフカプロデューサーが複数のトピックにメッセージを生成する方法は?
- 29. 複数のスコープは、私は、複数のスコープを作成する必要が
- 30. 複数のBezierPathから複数のパスを作成する方法
コンシューマ・グループを作成し、トピックまたはクラスまたはファンクションに割り当てる方法 –
JavaDoc(短い例もあります)を読んでください:https://kafka.apache.org/0100/javadoc/index.html ?org/apache/kafka/clients/consumer/KafkaConsumer.html –
しかし、私は.Netクライアントを使用しています。 –