2011-08-04 10 views
1

各スレッドが必要な入力を待って計算を行い、最後にその出力値を特定のスレッドに送信する必要があるスレッドセットがあります。処理されたメッセージを特定のスレッドに送信

私はスレッドの名前とスレッド自体を含むグローバルマップを持って、各スレッドが名前で "後継者"スレッドを取得してからそれらの値を送るようにする予定です。

まず、私は、ブロッキングキューを使用して生産者 - 消費者の例を見て:

class Consumer implements Runnable { 
    private final BlockingQueue queue; 

    Consumer(BlockingQueue q) { 
     queue = q; 
    } 

    public void run() { 
     try { 
      while(true) { 
       System.out.println("Waiting for input"); 
       consume(queue.take()); 
      } 
     } catch (InterruptedException ex) { 
      ex.printStackTrace(); 
     } 
    } 

    void consume(Object x) { 
     System.out.println("Received: " + x); 
    } 
} 

class Setup { 
    public static void main(String...args) { 
     BlockingQueue q = new ArrayBlockingQueue<String>(10); 
     Producer p = new Producer(q); 
     Consumer c1 = new Consumer(q); 
     Consumer c2 = new Consumer(q); 
     new Thread(p).start(); 
     new Thread(c1).start(); 
     new Thread(c2).start(); 
    } 
} 

私は、各スレッドのブロッキングキューを持っていると考えていました。コンシューマースレッドは、すべての目的の値を受け取るまで、queue.take()をループします。

この後、私はこのpostを見つけました。そこに私と同様の質問が尋ねられます。提案されたソリューションは、ブロッキングキューソリューションよりも簡単に思えます。メッセージを送信するスレッドのメソッドを呼び出すことに基づいています。

私は2つのアプローチのどちらが最善であるか、あるいは私が望むものを達成するためのよりよい方法があるかについてアドバイスをお願いしたいと思います。

ありがとうございました。

答えて

2

コンシューマープロデューサーは問題ありません。 (ご質問SO参考文献に参照「答えは」ワームの缶であることを...それを経由だと思う...)

あなたはQueuePipe、あるいはPipedInputStreamPipedOutputStreamを使用することができます。 Exchangerもあります。

Exchanger javadocの例のmodです。ネストされたクラスについて心配する必要はありません。それは単なるコンパクトなスタイルです。

ここには「パイプライン」クラスがあります。それは2つのスレッドを持っています(名前のR/Lは左、右を参照しています)。パイプラインの流れはR→Lです。

/* 
* mostly based on 
* http://download.oracle.com/javase/6/docs/api/java/util/concurrent/Exchanger.html 
*/ 
package so_6936111; 

import java.util.concurrent.Exchanger; 

public class WorkflowDemo { 

    public static void main(String[] args) { 
     Pipeline pipeline = new Pipeline(); 
     pipeline.start(); 
    } 
    // ---------------------------------------------------------------- 
    // Pipeline 
    // ---------------------------------------------------------------- 

    public static class Pipeline { 

     /** exchanger for messages */ 
     Exchanger<Message> exchanger = new Exchanger<Message>(); 

     /* the two message instances that are passed back and forth */ 
     Message msg_1 = new Message(); 
     Message msg_2 = new Message(); 

     /** startups the pipeline */ 
     void start() { 
      new Thread(new WorkerR()).start(); 
      new Thread(new WorkerL()).start(); 
     } 


     /** Message objects are passed between workflow threads */ 
     public static class Message { 
      private Object content; 
      public Object getContent() { return content; } 
      public void setContent(Object c) { this.content = c; } 
     } 


     /** WorkerR is at the head of the pipeline */ 
     class WorkerR implements Runnable { 
      public void run() { 
       Message message = msg_1; 
       try { 
        while (true) { 
         Object data = doSomeWork(); 
         message.setContent(data); 
         message = exchanger.exchange(message); 
        } 
       } catch (InterruptedException ex) { ex.printStackTrace();} 
      } 
      /** 
      * let's pretend this is where you get your 
      * initial data and do some work 
      */ 
      private Object doSomeWork() { 
       return String.format("[email protected]:%d", System.nanoTime()); 
      } 
     } 

     /** WorkerL is at the tail of the pipeline */ 
     class WorkerL implements Runnable { 
      public void run() { 
       Message message = msg_2; 
       try { 
        while (true) { 
         message = exchanger.exchange(message); 
         Object data = doPostProcessing(message.getContent()); 
         System.out.format("%s\n", data); 
        } 
       } catch (InterruptedException ex) { ex.printStackTrace();} 
      } 

      /** 
      * Let's pretend this is where the 2nd step of the workflow. 
      */ 
      private Object doPostProcessing(Object data) { 
       return String.format("%s | [email protected]:%d", data, System.nanoTime()); 
      } 
     } 
    } 
} 

出力:

[email protected]:1312434325594730000 | [email protected]:1312434325594747000 
[email protected]:1312434325594750000 | [email protected]:1312434325594765000 
[email protected]:1312434325594768000 | [email protected]:1312434325594784000 
[email protected]:1312434325594787000 | [email protected]:1312434325594804000 
[email protected]:1312434325594806000 | [email protected]:1312434325594823000 
[email protected]:1312434325594826000 | [email protected]:1312434325594841000 
... 
+0

は、あなたの答えをいただき、ありがとうございます。交換器は私が必要とするもののように見え、あなたが正しいですが、私が指摘した他の解決策は良く見えません。最後の質問があります。複数のスレッドがあり、プロデューサが特定のスレッドにメッセージを送信したい場合、どうすればよいですか?スレッドごとに1つのExchangerインスタンスが必要ですか?もし愚かな質問であれば申し訳ありませんが、私は車輪を再発明したくありません。ありがとうございました! –

+0

ラファエル:質問をあなたが望むものの正確な詳細で更新するだけです。 (消費者/プロデューサのコードはちょうどノイズ、fyiです)。エクスチェンジャーは1:1ブロック、バッファリングされていない*ハンドオフ*、ちょうどあなたがあなたのQを正確に指定する必要がある理由、データがどのように流れようとしているかなどのいくつかの考えを与えるためです。あなたに1:1 *パイプラインを表示する*おもちゃ*。 1:N、またはN:1? java.util.concurrent Queuesを参照してください。 – alphazero

関連する問題