2017-03-27 6 views
1

セマフォのリトルブックからインスパイアされたセマフォを使用してプロデューサ - コンシューマの問題を実装することにしました。セマフォを使用するプロデューサ - コンシューマ

具体的には、任意のワーカースレッドを自由に停止できるようにしたいと考えています。 私は方法論を広範囲にテストしたが、何か問題が見つからない。次のコード

は、テストのためのプロトタイプで、コンソールアプリケーションとして実行することができます。

using System; 
using System.Collections.Concurrent; 
using System.Threading; 
using NUnit.Framework; 

public class ProducerConsumer 
{ 
    private static readonly int _numThreads = 5; 
    private static readonly int _numItemsEnqueued = 10; 
    private static readonly Semaphore _workItems = new Semaphore(0, int.MaxValue); 
    private static readonly ManualResetEvent _stop = new ManualResetEvent(false); 
    private static ConcurrentQueue<int> _queue; 

    public static void Main() 
    { 
     _queue = new ConcurrentQueue<int>(); 

     // Create and start threads. 
     for (int i = 1; i <= _numThreads; i++) 
     { 
      Thread t = new Thread(new ParameterizedThreadStart(Worker)); 

      // Start the thread, passing the number. 
      t.Start(i); 
     } 

     // Wait for half a second, to allow all the 
     // threads to start and to block on the semaphore. 
     Thread.Sleep(500); 

     Console.WriteLine(string.Format("Main thread adds {0} items to the queue and calls Release() {0} times.", _numItemsEnqueued)); 
     for (int i = 1; i <= _numItemsEnqueued; i++) 
     { 
      Console.WriteLine("Waking up a worker thread."); 
      _queue.Enqueue(i); 
      _workItems.Release(); //wake up 1 worker 
      Thread.Sleep(2000); //sleep 2 sec so it's clear the threads get unblocked 1 by 1 
     } 

     // sleep for 5 seconds to allow threads to exit 
     Thread.Sleep(5000); 
     Assert.True(_queue.Count == 0); 

     Console.WriteLine("Main thread stops all threads."); 
     _stop.Set(); 

     // wait a while to exit 
     Thread.Sleep(5000); 
     Console.WriteLine("Main thread exits."); 
     Console.WriteLine(string.Format("Last value of Semaphore was {0}.", _workItems.Release())); 
     Assert.True(_queue.Count == 0); 
     Console.WriteLine("Press Enter to exit."); 
     Console.ReadLine(); 
    } 

    private static void Worker(object num) 
    { 
     // Each worker thread begins by requesting the semaphore. 
     Console.WriteLine("Thread {0} begins and waits for the semaphore.", num); 
     WaitHandle[] wait = { _workItems, _stop }; 
     int signal; 
     while (0 == (signal = WaitHandle.WaitAny(wait))) 
     { 
      Console.WriteLine("Thread {0} becomes unblocked by Release() and has work to do.", num); 
      int res; 
      if (_queue.TryDequeue(out res)) 
      { 
       Console.WriteLine("Thread {0} dequeues {1}.", num, res); 
      } 
      else 
      { 
       throw new Exception("this should not happen."); 
      } 
     } 

     if (signal == 1) 
      Console.WriteLine("Thread {0} was stopped.", num); 

     Console.WriteLine("Thread {0} exits.", num); 
    } 
} 

今、私の質問のために、私は、私がセマフォにRelease()を呼び出すという仮定の下でWaitHandle.WaitAny(semaphore)を使用しています1労働者が目覚める。しかし、私はこれが実際に正しいことを文書で確認することはできません。これは本当ですか?

+2

はい、その点です。 – Andrey

+1

あなたは 'SemaforSlim'を使うべきです、より速いです。このタスクのために['TPL Dataflow'](https://msdn.microsoft.com/en-us/library/hh228601.aspx)ライブラリを調べることができます。あなたは' MaxDegreeOfParallelism'プロパティを使って作業者の量を調整するか、パイプラインを完全に停止するために2つのブロック間のリンクを廃棄してください – VMAtm

+0

ありがとう@VMAtm、それを調べます。 –

答えて

2

文書には、WaitOneの場合、1つのスレッドだけが信号を受信することが明示的に示されていないようです。マルチスレッド理論に慣れると、これはやや自明になります。 (Semaphoreを含むWaitHandle Sのリスト上で呼び出され、WaitAnySemaphoreで呼び出され

はい、WaitOneは、単一のスレッドによって受信されます。あなたはそれがあるので、ここでMSDNから参照したい場合は、Semaphore

which is:は、共有リソースへの排他的アクセスを待ち、オペレーティング・システム固有のオブジェクトをカプセル化し、WaitHandleの子クラスです。

したがって、明示的に記載されている方法では排他的アクセスを提供しない限り、そうです。

例の方法についてManualResetEventWaitOneは、すべての待機中のスレッドのbut documentation is explicit about itのブロックを解除します:イベントが発生した

通知し一つ以上の待機中のスレッドを。

+0

ありがとう、それは私が期待していた答えです。 'ManualResetEvent'の振る舞いのために、私は' Semaphore'の振る舞いに安心していたのです。 –

関連する問題