2013-03-10 9 views
5

flume-ngのカスタムシンクを作成しようとしています。私は既存のシンクとドキュメンテーションを見て、それをコーディングしました。しかし、イベントを受け取るはずの 'process()'メソッドは常にnullで終了します。 私はしているイベントイベント= channel.take();イベントはnullです。このメソッドは、イベントがまだチャンネルにあるときに繰り返し呼び出されます。Flume-ngヌルイベントのカスタムシンク

誰かが正しい方向に向かうことができますか?

答えて

5

これは、処理機能のスケルトンです...あなたがBACKOFFにステータスを変更し、をロールバックイベントを取得して失敗した場合。あなたでない場合をコミットし、ステータスをREADYに設定します。何があっても、あなたは常に取引を終了します。

Status status = null; 
    Channel channel = getChannel(); 
    Transaction transaction = channel.getTransaction(); 
    transaction.begin(); 
    try { 
     Event event = channel.take(); 

     if (event != null && validEvent(event.getBody()) >= 0) { 
      # make some printing 
     } 
     transaction.commit(); 
     status = Status.READY; 
    } catch (Throwable ex) { 
     transaction.rollback(); 
     status = Status.BACKOFF; 
     logger.error("Failed to deliver event. Exception follows.", ex); 
     throw new EventDeliveryException("Failed to deliver event: " + ex); 
    } finally { 
     transaction.close(); 
    } 
    return status; 

私はこれがうまくいくと確信しています:)。

+0

素晴らしいおかげでそれはまだ2016年に私を助けます。.. – logicalgeek

+0

はちょっと私はここで同様の問題があります。https://stackoverflow.com/questions/46479157/streaming-kafka- messages-to-mysql-database あなたはこれについて考えていますか? –

4

これは仕様です。シンクランナーはシンクをnullイベントでポーリングするので、シンクが生きていて将来のイベントを受け入れる準備ができていることを確認できます。 nullイベントを受け取ったら、Status.BACKOFFを返すことを確認してから、シンクプロセッサーがもう一度お試しください。

+0

[documentation](http://flume.apache.org/FlumeDeveloperGuide.html#sink)には何も言われていません。 – Dmitry

+0

私は同意します。 Flumeのドキュメントは非常に最小限であり、少し詳しく説明する必要があります。 – logicalgeek

+0

バックオフ期間はどのくらいですか?そしてそれはどのように制御されていますか? AbstractSinkクラスは、ソースのようなメソッドを実装していません。 。。 公共長いgetBackOffSleepIncrement() 公共長いgetMaxBackOffSleepInterval( – bearrito

関連する問題