0

BlockingCollection<T>を使用してプロデューサ/コンシューマパターンを実装しようとしています。そのため、テストするための簡単なコンソールアプリケーションを作成しました。BlockingCollection.GetConsumingEnumerableを正しく使用するには?

public class Program 
{ 
    public static void Main(string[] args) 
    { 
     var workQueue = new WorkQueue(); 
     workQueue.StartProducingItems(); 
     workQueue.StartProcessingItems(); 

     while (true) 
     { 

     } 
    } 
} 

public class WorkQueue 
{ 
    private BlockingCollection<int> _queue; 
    private static Random _random = new Random(); 

    public WorkQueue() 
    { 
     _queue = new BlockingCollection<int>(); 

     // Prefill some items. 
     for (int i = 0; i < 100; i++) 
     { 
      //_queue.Add(_random.Next()); 
     } 
    } 

    public void StartProducingItems() 
    { 
     Task.Run(() => 
     { 
      _queue.Add(_random.Next()); // Should be adding items to the queue constantly, but instead adds one and then nothing else. 
     }); 
    } 

    public void StartProcessingItems() 
    { 
     Task.Run(() => 
     { 
      foreach (var item in _queue.GetConsumingEnumerable()) 
      { 
       Console.WriteLine("Worker 1: " + item); 
      } 
     }); 

     Task.Run(() => 
     { 
      foreach (var item in _queue.GetConsumingEnumerable()) 
      { 
       Console.WriteLine("Worker 2: " + item); 
      } 
     }); 
    } 
} 

は、しかし、私のデザインの3つの問題があります

  • 私は私の主な方法で待っ/ブロックの正しい方法を知りません。単純な空のwhileループを実行すると、アプリケーションが終了しないことを確認するために単純に浪費するひどく非効率的なCPU使用率があるようです。

  • 私のデザインには別の問題もあります。この単純なアプリケーションでは、アイテムを無期限に生産し、決して止めるべきではありません。現実世界の設定では、最終的に終了したい(たとえば、処理するファイルが不足している)。その場合、どのようにMainメソッドで終了するのを待つ必要がありますか? StartProducingItemsasyncとし、awaitとしますか?

  • GetConsumingEnumerableまたはAddのいずれかが期待どおりに機能していません。プロデューサーは常にアイテムを追加する必要がありますが、アイテムを1つ追加してからもう追加しません。この1つのアイテムは、消費者の1人によって処理されます。どちらの消費者も、追加するアイテムの待機をブロックしますが、どちらも存在しません。私はTakeメソッドを知っていますが、whileループでTakeを再び回転させるのはかなり無駄で非効率的です。 CompleteAddingメソッドがありますが、それ以外のものは追加できません。試してみると例外がスローされるため、適切ではありません。

    enter image description here

    EDIT:

    私は」私は両方の消費者が実際のブロックであると私はデバッグ中にスレッドを切り替えることができますように、新しいアイテムを待っていることを確実に知る

コメントの1つで変更を提案しましたが、Task.WhenAllはまだすぐに返されます。

public Task StartProcessingItems() 
{ 
    var consumers = new List<Task>(); 

    for (int i = 0; i < 2; i++) 
    { 
     consumers.Add(Task.Run(() => 
     { 
      foreach (var item in _queue.GetConsumingEnumerable()) 
      { 
       Console.WriteLine($"Worker {i}: " + item); 
      } 
     })); 
    } 

    return Task.WhenAll(consumers.ToList()); 
} 
+0

'プロデューサーは常にアイテムを追加する必要がありますが、アイテムを1つ追加してからもう追加しません。この1つのアイテムは、消費者の1人によって処理されます。両方の消費者は、追加するアイテムの待機をブロックしますが、どれも「そうです、はい。もう何を期待していましたか?消費者はアイテムが追加されるまでブロックされます。そのため、これはブロッキングコレクションと呼ばれます –

+0

いいえ、問題はプロデューサが1つのアイテムを追加してから停止することです。 – user9993

+0

それはあなたがそれをコード化した方法です。あなたは、単一のアイテムを追加するタスクを開始しています。あなたはループや何かが欠けていますか? –

答えて

4

GetConsumingEnumerable()がブロックしています。あなたは常にキューに追加したい場合は、ループ内で_queue.Addへの呼び出しを置く必要があります。

public void StartProducingItems() 
{ 
    Task.Run(() => 
    { 
     while (true) 
      _queue.Add(_random.Next()); 
    }); 
} 

あなたが押されている前に、仕上げからメインスレッドを防ぐためにConsole.ReadLine()メソッドを呼び出すことができMain()方法についてキー:

public static void Main(string[] args) 
{ 
    var workQueue = new WorkQueue(); 
    workQueue.StartProducingItems(); 
    workQueue.StartProcessingItems(); 

    Console.WriteLine("Press a key to terminate the application..."); 
    Console.ReadLine(); 
} 
+0

私がenterを押すとすぐに、それは終了します。より良い方法が必要です。おそらく、StartProducingItemsを作ることは非同期であるべきですか? – user9993

+0

@ user9993 'StartProcessingItems'で作成したタスクを公開し、' Main() 'で' .Wait() 'を呼び出します。複数のワーカーがいる場合は、「Task.WhenAll'を使用して集約タスクを作成し、それを公開してください。 –

+0

WorkQueueクラスからタスクを公開すると、それらを待つことができます。しかし、プロデューサーにプロダクションを止めるようにどのように伝えますか? – mm8

関連する問題