2011-12-21 26 views
6

スレッド間通信に問題があり、 "ダミーメッセージ"を全面的に使用して解決しました。これは悪い考えですか?可能な解決策は何ですか?プロデューサ - コンシューマ間スレッド間通信

例問題があります。

メインスレッドは、レコードを処理してデータベースに挿入するスレッドを開始します。 メインスレッドは、おそらく巨大なファイルを読み取り、1つのレコード(オブジェクト)を次々にブロッキングキューに格納します。処理スレッドはキューから読み込み、動作します。

「スレッドの処理」にはどのように停止する必要がありますか? キューは空でもかまいませんが、作業は完了せず、スレッドの処理が完了してもそれを中断することはできません。

だから、処理スレッドは挿入が同じトランザクションおよびトランザクションでなければならないこと

while (queue.size() > 0 || !Thread.currentThread().isInterrupted()) { 
    MyObject object= queue.poll(100, TimeUnit.MILLISECONDS); 
    if (object != null) { 
     String data = object.getData(); 
     if (data.equals("END")) { 
      break; 
     } 
    // do work 
    } 
} 
// clean-up 
synchronized queue) { 
    queue.notifyAll(); 
} 
return; 

とメインスレッド

// ...start processing thread... 
while(reader.hasNext(){ 
    // ...read whole file and put data in queue... 
} 
MyObject dummy = new MyObject(); 
dummy.setData("END"); 
queue.put(dummy); 
//Note: empty queue here means work is done 
while (queue.size() > 0) { 
    synchronized (queue) { 
     queue.wait(500); // over-cautios locking prevention i guess 
    } 
} 

注メインスレッドで を扱うことができないん。

これを行うにはどうすればよいでしょうか? (私は学んでおり、「間違ったやり方」を始めたくない)

答えて

4

これらのダミーメッセージは有効です。それは "毒"と呼ばれています。プロデューサーが消費者にそれを止めるために送るもの。

他の可能性は、メインスレッドのどこかでThread.interrupt()を呼び出して、それに応じてInterruptedExceptionをキャッチして処理することです。

+0

渡されたオブジェクトの型に、このために簡単に使用できる文字列フィールドがない場合は、どうすれば使用できますか?テキストが有効なデータである可能性があり、したがって毒と等しい場合はどうなりますか? GUIDを使用しますか? –

+1

理想的には、MyObjectクラスを変更してisPoison()メソッドをそこに追加します。このメソッドがオブジェクトが毒であるかどうかを判断する方法はあなた次第ですが、メッセージがヌル、空であるかどうか、またはメッセージが明示的に毒として作成されたかどうかを比較するなどの簡単なことがあります。アイデアは、MyObjectを2つの実装、つまりisPoison()メソッドで常にfalseを返すMessageObjectとPoisonMessage(常にtrueを返す)とのインタフェースになるようにリファクタリングすることです。しかし、これは深くあなたが何をしているのかに大きく依存しており、実際の状況を考えれば、より良い解決策が存在する可能性があります。 –

2

「ダミーメッセージ」を使用して完全に解決しました。これは の悪い考えですか?可能な解決策は何ですか?

これは「Poison Pills」と呼ばれ、スレッドベースのサービスを停止するための妥当な方法です。

ただし、プロデューサとコンシューマの数がわかっている場合にのみ動作します。

あなたが投稿したコードには、データを生成する「メインスレッド」とデータを消費する「処理スレッド」の2つのスレッドがあり、この状況では「ポイズンピル」がうまく機能します。

しかし、他の生産者もいる場合、消費者はいつ停止するのか(すべての生産者が "Poison Pills"を送信する場合のみ)を知っているか、すべての生産者の数を正確に把握し、消費者の「ポイズンピル」の数がプロデューサーの数に等しい場合は、すべてのプロデューサーが機能しなくなり、消費者が停止することを意味します。

"メインスレッド"ではInterruptedExceptionをキャッチする必要があります。そうでない場合は、 "メインスレッド"が "ポイズンピル"を設定できない可能性があります。あなたはすべてのあなたの問題を解決するためにExecutorServiceを使用しようとすることができ、また、以下のように

... 
try { 
    // do normal processing 
} catch (InterruptedException e) { /* fall through */ } 
finally { 
    MyObject dummy = new MyObject(); 
    dummy.setData("END"); 
    ... 
} 
... 

を行うことができます。

(あなただけのいくつかの作品を行い、その後、すべてが終了したときに停止する必要があるとき、それは動作します)

void doWorks(Set<String> works, long timeout, TimeUnit unit) 
    throws InterruptedException { 
    ExecutorService exec = Executors.newCachedThreadPool(); 
    try { 
     for (final String work : works) 
      exec.execute(new Runnable() { 
        public void run() { 
         ... 
        } 
       }); 
    } finally { 
     exec.shutdown(); 
     exec.awaitTermination(timeout, unit); 
    } 
} 

私が学んだし、「それを間違った方法をやって」を開始する必要はありません

書籍:Java Concurrency in Practiceを読む必要があります。私を信じて、それは最高です。

+0

私はエグゼキュータ・サービスについて知っていますが、データの読み取りと処理(検証)と挿入は、同じデータベース・トランザクションで実行する必要があるため、同じタスクまたはメソッドにすることはできません。私が何を意味するのか理解していれば。 –

+0

OK、私はそれを得る、 'ExecutorService'は私の一般的な推奨事項です、それを使用するかどうかあなたの要件に応じて。しかし、すべての例外をキャッチするように注意してください。いくつかの例外がキャッチされない場合は、 "Poison Pills"を設定する可能性が失われる可能性があります。 –

0

私が最近のプロジェクトで行ったことは、キューをラップしてから'isOpen()'メソッドを追加することです。

while (reader.hasNext()) { 
    // read the file and put it into the queue 
    dataQ.put(someObject); 
} 
// now we're done 
dataQ.close(); 

と読者のスレッド::

class ClosableQ<T> { 

    boolean isOpen = true; 

    private LinkedBlockingQueue<T> lbq = new LinkedBlockingQueue<T>(); 

    public void put(T someObject) { 
     if (isOpen) { 
     lbq.put(someObject); 
     } 
    } 

    public T get() { 
     if (isOpen) { 
     return lbq.get(0); 
     } 
    } 

    public boolean isOpen() { 
     return isOpen; 
    } 

    public void open() { 
     isOpen = true; 
    } 

    public void close() { 
     isOpen = false; 
    } 
} 

だからあなたのライター・スレッドのようなものになり

while (dataQ.isOpen) { 
    someObject = dataQ.get(); 
} 

もちろんの代わりにリストを拡張することができますが、それはユーザーにレベルを与えますあなたが望むかもしれないアクセス。 AtomicBooleanのように、このコードにいくつかの並行処理を追加する必要があります。

+0

私はメソッドにキューを渡しています。あなたの考えを読んで私には次のようなことができました。このメソッドは、AtomicBooleanのkeepProcessingを受け入れることもできます。それから、queue.size> 0 ||の長さで実行できます。 keepProcessing.get() –