2016-10-07 5 views
1

I以前私は、それぞれ以下のようにKafkaProducerが新た各要求で構築されたrequest.whereためプロデューサを作成するコードの下使用し、メッセージを生成するプロデューサのクラスを使用するマルチスレッドアプリケーションを持っている:私はカフカを読ん一度閉じたカフカプロデューサーを再接続するには?

KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(prop); 

ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, objBytes); 
producer.send(data, new Callback() { 

       @Override 
       public void onCompletion(RecordMetadata metadata, Exception exception) { 
        if (exception != null) { 
         isValidMsg[0] = false; 
         exception.printStackTrace(); 
         saveOrUpdateLog(msgBean, producerType, exception); 
         logger.error("ERROR:Unable to produce message.",exception); 
        } 
       } 
      }); 
producer.close(); 

私たちが良いパフォーマンスを得るためには、単一のプロデューサインスタンスを使用すべきであることを知りました。

次に、シングルトンクラス内にKafkaProducerの単一インスタンスを作成しました。

今度は&プロデューサーを閉じる必要があります。

java.lang.IllegalStateException: Cannot send after the producer is closed. 

OR我々は一度閉じプロデューサーに再接続することができる方法を:明らかに最初の要求を送信した後、我々は生産を終了した場合、それは文句を言わないので、投げたメッセージを再送信するプロデューサーを見つけます。 プログラムがクラッシュしたり、例外が発生したりするのは問題ですか?

答えて

2

KafkaProducer.sendメソッドは非同期で、Future[RecordMetadata]を返します。送信後すぐにcloseを送信した場合、競合状態になり、KafkaProducerのバッファリングの性質上、メッセージが送信されないことがあります。

プロデューサーがアプリケーションの存続期間中使用されている場合は、プロジェクターを終了してからプロジェクターを終了させます。ドキュメントで述べたように、プロデューサはマルチスレッド環境で使用するのが安全です。したがって、同じインスタンスを再利用する必要があります。

場合によっては、KafkaProducerを終了する必要があると思われる場合は、お客様のKafkaオブジェクト内にisClosedフラグを追加し、消費者がデータを再送信する必要がある場合に監視することができます。ラフスケッチをすることができます:

object KafkaOwner { 
    private var producer: KafkaProducer = ??? 
    @volatile private var isClosed = false 

    def close(): Unit = { 
    if (!isClosed) { 
     kafkaProducer.close() 
     isClosed = true 
    } 
    } 

    def instance: KafkaProducer = { 
    this.synchronized { 
     if (!isClosed) producer 
     else { 
     producer = new KafkaProducer() 
     isClosed = false 
     } 
    } 
    } 
} 
+0

これはsync/asyncの両方です。さらに、例外が発生した場合:アプリケーションがクラッシュし、再接続する方法がいくつか考えられます。注:私はnullではないKafkaProducerのisntanceを再初期化せず、close()メソッドが呼び出された後でもすべてのプロパティを保持します。また、私は複数のアプリすなわち4つの消費者がこの共有のプロデューサーを使用して複数のトピックにmsgを送信します。 – usman

+0

@usmanなぜあなたはそれが同期と非同期の両方であると言いますか? [同期版はどこにありますか](https://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html)? –

+0

https://kafka.apache.org/08/documentation#implementation "2つの低レベルのプロデューサをラップするプロデューサAPI - "、。まあ、kafkaが提供する方法はありますか?あなたのコードは、オブジェクトをインスタンス化し直さなければならないことを示しています。 – usman

1

KafkaProducerのjavadocで説明したように:

public void close() 

Close this producer. This method blocks until all previously sent requests complete. 
This method is equivalent to close(Long.MAX_VALUE, TimeUnit.MILLISECONDS). 

はSRC:https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#close()

だから、あなたはあなたのメッセージが送信されないことを心配する必要はありません。たとえあなたが送信直後に電話をかけたとしても。

KafkaProducerを複数回使用する予定がある場合は、使用を終了した後で終了してください。メソッドが完了してバッファで待機しないうちにメッセージが実際に送信されることを保証したい場合は、KafkaProducer#flush()を使用して、現在のバッファが送信されるまでブロックします。必要に応じてFuture#get()をブロックすることもできます。

は、あなたが今まで(あなただけのいくつかのデータを送信し、アプリがすぐにを送信した後に終了短命アプリで例えば)あなたのKafkaProducerを閉鎖する予定がない場合注意すべき1つの警告もあります。 KafkaProducer IOスレッドはデーモンスレッドです。つまり、このスレッドがVMの終了を完了するまでJVMは待機しません。したがって、メッセージが実際に送信されるようにするには、KafkaProducer#flush()、no-arg KafkaProducer#close()またはブロックFuture#get()を使用してください。

関連する問題