1
stream.addSink(new FlinkKafkaProducer09<String>("Topic-name", new SimpleStringSchema(),
properties));
FlinkkafkaProducerのコールバックや完了はありますか?カフカシンクに生産した後の承認方法(FlinkKafkaProducer09を使用)
stream.addSink(new FlinkKafkaProducer09<String>("Topic-name", new SimpleStringSchema(),
properties));
FlinkkafkaProducerのコールバックや完了はありますか?カフカシンクに生産した後の承認方法(FlinkKafkaProducer09を使用)
プロデューサーにメッセージを送信できます。
メッセージを送信したら、コールバックを書き込むことができます。それは高価な操作だと思った、あなたは一般的にそれを避けてください。
send()メソッドは非同期です。呼び出されると、レコードは保留中のレコードの バッファに追加され、すぐに戻ります。これにより、 プロデューサは効率のために個々のレコードをバッチ処理できます。
あなたは、その使用を見ることができます:
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
producer.send(myRecord,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null)
e.printStackTrace();
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}
});
は詳細についてdocumentationを参照してください。
申し訳ございませんが、ご使用のケースを詳細にご説明いただけますか? –