2012-04-04 13 views
0

http://en.wikipedia.org/wiki/Producer-consumer_problemによると、私はセマフォを使用してP/Cの問題をシミュレートしたいです。私はデッドロックになり、問題は何か分からない。セマフォを使用するプロデューサ/コンシューマ。デッドロックを取得する

セマフォが失われたウェイクアップコールの問題を解決するためのセマフォを使用して

public static void main(String[] args) { 
     CustomBlockingQueue blockingQueue = new CustomBlockingQueue(); 
     new Thread(new Producer(blockingQueue)).start(); 
     new Thread(new Consumer(blockingQueue)).start(); 
    } 
} 

@SuppressWarnings("serial") 
class CustomBlockingQueue extends LinkedList<Object> { 
    private static final int MAX_SIZE = 10; 

    private Semaphore mutex = new Semaphore(1); 
    private Semaphore fillCount = new Semaphore(0); 
    private Semaphore emptyCount = new Semaphore(MAX_SIZE); 

    @Override 
    public boolean offer(Object e) { 
     try { 
      mutex.acquire(); 
     } catch (InterruptedException e2) { 
      e2.printStackTrace(); 
     } 
     boolean result = super.offer(e); 
     System.out.println("offer " + size()); 
     try { 
      fillCount.release(); 
      emptyCount.acquire(); 
      mutex.release(); 
     } catch (InterruptedException e1) { 
      e1.printStackTrace(); 
     } 
     return result; 
    } 

    @Override 
    public Object poll() { 
     try { 
      mutex.acquire(); 
     } catch (InterruptedException e2) { 
      e2.printStackTrace(); 
     } 
     Object result = super.poll(); 
     System.out.println("poll " + size()); 
     try { 
      emptyCount.release(); 
      fillCount.acquire(); 
      mutex.release(); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
     return result; 
    } 
} 

class Producer implements Runnable { 
    private CustomBlockingQueue blockingQueue; 
    private Random random = new Random(); 

    public Producer(CustomBlockingQueue blockingQueue) { 
     this.blockingQueue = blockingQueue; 
    } 

    @Override 
    public void run() { 
     while (!Thread.currentThread().isInterrupted()) { 
      try { 
       TimeUnit.SECONDS.sleep(random.nextInt(2)); 
       blockingQueue.offer(new Object()); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
    } 
} 

class Consumer implements Runnable { 
    private CustomBlockingQueue blockingQueue; 
    private Random random = new Random(); 

    public Consumer(CustomBlockingQueue blockingQueue) { 
     this.blockingQueue = blockingQueue; 
    } 

    @Override 
    public void run() { 
     while (!Thread.currentThread().isInterrupted()) { 
      try { 
       TimeUnit.SECONDS.sleep(random.nextInt(4)); 
       blockingQueue.poll(); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
    } 

。以下の解決法では、問題を解決するために2つのセマフォー、fillCountとemptyCountを使用します。 fillCountはバッファで読み込む項目の数、emptyCountは項目を書き込むことができるバッファ内の空きスペースの数です。新しい項目がバッファに入れられると、fillCountがインクリメントされ、emptyCountが減少します。プロデューサがその値がゼロの間にemptyCountをデクリメントしようとすると、プロデューサはスリープ状態になります。次回アイテムが消費されると、emptyCountがインクリメントされ、プロデューサが起動します。消費者は同様に働く。

+1

あなた自身の頭痛を救う:スレッドセーフなキューを使用する。 – jldupont

+0

合意しました - これはすべてあなたのためにライブラリに実装されています - それを難し​​い方法で行う必要はありません! – DNA

+0

意図的にセマフォー – ASD

答えて

2

をあなたのロックが間違った順序である:

を提供するためにする必要があります

世論調査のために必要な
 emptyCount.acquire(); 
     mutex.acquire(); 
     doModification(); 
     mutex.release(); 
     fillCount.release(); 

同様の変化:

 fillCount.acquire(); 
     mutex.acquire(); 
     doModification(); 
     mutex.release(); 
     emptyCount.release(); 

あなたの実装では、セマフォを解放するために他のスレッドがmutexを待つ可能性があるため、mutexを保持している間にセマフォを待っています。

+0

+1セマフォP-Cキューは60年代以降教室で教えられており、数多くの例があります。 –

2

代わりに、mutexのロックを取り、あなたを待っているBlockingQueueを使用することをお勧めします。

脇に、(疑似割り込みとは対照的に)プロデューサ/コンシューマ競合条件を示す古いページがあります。私はそれはあなたを助けるかわからないので、しかし、私の実装では、セマフォを使用していません:

http://256stuff.com/gray/docs/misc/producer_consumer_race_conditions/

+0

私はBlockingQueueについて知っていますが、意図的にセマフォを使ってこの問題を解決したいと思います。 – ASD

+0

Figured @ASD。私はちょうどそれを指摘したいと思った。私はまた、FYIのプロデューサー/消費者ページへのリンクを貼っています。 – Gray

+0

返信いただきありがとうございますが、私はすでにこの方法でこの問題を解決しています(ただしあなたとは少し異なります)。 – ASD

関連する問題