セマフォのリトルブックからインスパイアされたセマフォを使用してプロデューサ - コンシューマの問題を実装することにしました。セマフォを使用するプロデューサ - コンシューマ
具体的には、任意のワーカースレッドを自由に停止できるようにしたいと考えています。 私は方法論を広範囲にテストしたが、何か問題が見つからない。次のコード
は、テストのためのプロトタイプで、コンソールアプリケーションとして実行することができます。
using System;
using System.Collections.Concurrent;
using System.Threading;
using NUnit.Framework;
public class ProducerConsumer
{
private static readonly int _numThreads = 5;
private static readonly int _numItemsEnqueued = 10;
private static readonly Semaphore _workItems = new Semaphore(0, int.MaxValue);
private static readonly ManualResetEvent _stop = new ManualResetEvent(false);
private static ConcurrentQueue<int> _queue;
public static void Main()
{
_queue = new ConcurrentQueue<int>();
// Create and start threads.
for (int i = 1; i <= _numThreads; i++)
{
Thread t = new Thread(new ParameterizedThreadStart(Worker));
// Start the thread, passing the number.
t.Start(i);
}
// Wait for half a second, to allow all the
// threads to start and to block on the semaphore.
Thread.Sleep(500);
Console.WriteLine(string.Format("Main thread adds {0} items to the queue and calls Release() {0} times.", _numItemsEnqueued));
for (int i = 1; i <= _numItemsEnqueued; i++)
{
Console.WriteLine("Waking up a worker thread.");
_queue.Enqueue(i);
_workItems.Release(); //wake up 1 worker
Thread.Sleep(2000); //sleep 2 sec so it's clear the threads get unblocked 1 by 1
}
// sleep for 5 seconds to allow threads to exit
Thread.Sleep(5000);
Assert.True(_queue.Count == 0);
Console.WriteLine("Main thread stops all threads.");
_stop.Set();
// wait a while to exit
Thread.Sleep(5000);
Console.WriteLine("Main thread exits.");
Console.WriteLine(string.Format("Last value of Semaphore was {0}.", _workItems.Release()));
Assert.True(_queue.Count == 0);
Console.WriteLine("Press Enter to exit.");
Console.ReadLine();
}
private static void Worker(object num)
{
// Each worker thread begins by requesting the semaphore.
Console.WriteLine("Thread {0} begins and waits for the semaphore.", num);
WaitHandle[] wait = { _workItems, _stop };
int signal;
while (0 == (signal = WaitHandle.WaitAny(wait)))
{
Console.WriteLine("Thread {0} becomes unblocked by Release() and has work to do.", num);
int res;
if (_queue.TryDequeue(out res))
{
Console.WriteLine("Thread {0} dequeues {1}.", num, res);
}
else
{
throw new Exception("this should not happen.");
}
}
if (signal == 1)
Console.WriteLine("Thread {0} was stopped.", num);
Console.WriteLine("Thread {0} exits.", num);
}
}
今、私の質問のために、私は、私がセマフォにRelease()
を呼び出すという仮定の下でWaitHandle.WaitAny(semaphore)
を使用しています1労働者が目覚める。しかし、私はこれが実際に正しいことを文書で確認することはできません。これは本当ですか?
はい、その点です。 – Andrey
あなたは 'SemaforSlim'を使うべきです、より速いです。このタスクのために['TPL Dataflow'](https://msdn.microsoft.com/en-us/library/hh228601.aspx)ライブラリを調べることができます。あなたは' MaxDegreeOfParallelism'プロパティを使って作業者の量を調整するか、パイプラインを完全に停止するために2つのブロック間のリンクを廃棄してください – VMAtm
ありがとう@VMAtm、それを調べます。 –