2012-05-04 14 views
0

を扱う私は、次のプロデューサー消費パターン - プロデューサー障害

  • キュータ
  • Aシングル経由で起動プロデューサースレッド、自分自身のBlockingQueueへの書き込みをそれぞれ、一定の数のようなプロデューサ/コンシューマパターンを持っていますプロデューサスレッドを読み取る

各プロデューサは、データベースクエリを実行し、その結果をキューに書き込みます。消費者はすべてのプロデューサキューをポーリングします。現時点では、データベースエラーが発生した場合、プロデューサスレッドは消滅し、その後、コンシューマはプロダクトキュー上のさらなる結果を待っています。

キャッチエラーを正しく処理するにはどうすればよいですか?

+0

try/catch内でfuture.get()を使用しますか? – assylias

答えて

1

プロデューサが死んだときの唯一の選択肢は、消費者を止めることだと思われます。

これを行うには、毒丸を使用できます。これは、プロデューサが停止し、消費者がそれを受け取ったときに停止することを知っているときに追加する特別なオブジェクトです。毒ピルはfinallyブロックに追加することができるので、プロデューサーがどのように殺されたかに関係なく常に追加されます。

コンシューマが1つしかない場合は、1つのキューを使用します。このようにして、消費者はすべての生産者が死亡した場所だけをブロックします。

+0

キューは、プロデューサから戻ってくるデータの性質上、別々に保管する必要があります。私は実際にBlockingQueueの周りのカスタムキューラッパーを使用してデータを覗き込み、適切に照合します。 –

+0

プロデューサが照合を実行できない理由はありますか? –

+0

各生成スレッドはDBクエリであり、消費者は各キューから最初のアイテムを検査して順序に並べ替える必要があります。例えば、あるキューは、多くのアイテムを最初に処理してから2番目のアイテムを消費する必要がある別のアイテムの前にアイテムを生成している可能性があります。 –

4

私はかつて同じようなことをして、死んでいるプロデューサスレッドがcatchブロックからキューに押し込むというセンチネル値を使用することに決めました。例外自体をプッシュすることができます(これはほとんどのシナリオで機能します)。または、そのための特別なオブジェクトがあります。いずれにしても、デバッグの目的で例外をコンシューマにプッシュするのはすばらしいことです。

+0

これは正しいアプローチです。 「それが何であれ」のインスタンスを適切なエラーデータ(例外オブジェクトなど)でプッシュします。通常、データを運ぶクラスに成功/失敗/エラーメンバーが含まれているため、エラーの原因となったデータがコンシューマに送信される場合は、より簡単です。消費者は、インスタンスをロガーおよび/またはGUIに転送することができ、例外およびそれを生成したすべてのソースデータをテキスト化することができます。 –

+0

すべてのキューアイテムにコンテナオブジェクトが使用されている場合は、そのメソッドの呼び出し時に例外を再現する特別な実装を行うことができます。それは、他の立場から受け入れられるならば、コードを本当に単純にします。 –

+0

私は既にプロデューサーがプロダクションを終えたとき(ポイズンピル)を検出するために似たようなことをしているので、これを処理するためにそれを拡張することもできます。 –

0

特定の時間キューに要素がなくなると、消費者を殺すタイムアウトを追加することがあります。

もう1つの方法として、プロデューサに「生きている」フラグを保持させ、そのフラグをfalseに設定して死にかけていることを知らせる方法があります。プロデューサが連続して実行されても、データベースから常に結果が得られるとは限りません。「生きている」フラグは、プロデューサが最後に生きていたと報告した時刻になります。そして、タイムアウトを使用して、生きているのはずっと前です)。

1

キューに実際にプッシュするクラスは、コンシューマが失敗を確認できるように成功/失敗/エラーのメンバーが含まれている必要があります。

Peterは既に1つのキューを使用することを提案しています - どのポーリングでも特定の問題を回避する方法はわかりません。キュー上のオブジェクトには、どのプロデューサから来たプロデューサ、もし必要なら。

+0

アイテムは特定の順序で作成されるため、アイテムを混ぜることは実際には機能しません。 –

0

私自身の質問に答える。

次のクラスを使用しました。それはRunnableのリストを受け取り、それらをすべて並列に実行します。失敗した場合は、他のすべてのものを中断します。それから私は中断されたときに私の生産者と消費者が優雅に死ぬことを中断しています。

これは私の場合にうまく機能します。

私がいくつかのアイデアをくれたので、すべてのコメント/回答をありがとう。

// helper class that does the following 
// 
// if any thread has an exception then interrupt all the others with an eye to cancelling them 
// if the thread calling execute() is interrupted then interrupt all the child threads 
public class LinkedExecutor 
{ 
    private final Collection<Runnable> runnables; 
    private final String name; 

    public LinkedExecutor(String name, Collection<Runnable> runnables) 
    { 
     this.runnables = runnables; 
     this.name = name; 
    } 

    public void execute() 
    { 
     ExecutorService executorService = Executors.newCachedThreadPool(ConfigurableThreadFactory.newWithPrefix(name)); 

     // use a completion service to poll the results 
     CompletionService<Object> completionService = new ExecutorCompletionService<Object>(executorService); 

     for (Runnable runnable : runnables) 
     { 
      completionService.submit(runnable, null); 
     } 

     try 
     { 
      for (int i = 0; i < runnables.size(); i++) 
      { 
       Future<?> future = completionService.take(); 
       future.get(); 
      } 
     } 
     catch (InterruptedException e) 
     { 
      // on an interruption of this thread interrupt all sub-threads in the executor 
      executorService.shutdownNow(); 

      throw new RuntimeException("Executor '" + name + "' interrupted", e); 
     } 
     catch (ExecutionException e) 
     { 
      // on a failure of any of the sub-threads interrupt all the threads 
      executorService.shutdownNow(); 

      throw new RuntimeException("Execution execution in executor '" + name + "'", e); 
     } 
    } 
} 
関連する問題