2017-02-02 9 views
-1

Queue複数のスレッドでEnqueue()のキューが機能する必要がある場合、C#で使用するデータ構造は1つのメインスレッドでDequeue()にするだけです。マイスレッド構造は次のようになります。キューとスレッディング

  • メインスレッド - 消費者
  • サブスレッド1 - プロデューサー
  • サブスレッド2 - プロデューサー
  • サブThread3 - プロデューサー、私が持っている

サブスレッドとメインスレッドによって生成されたすべてのアイテムを保持する単一のQueue<T> queuequeue.Dequeue()が空になるまで。私は、この目的のために私のメインスレッドで呼び出される次の関数を持っています。

public void ConsumeItems() 
{ 
    while (queue.Count > 0) 
    { 
     var item = queue.Dequeue(); 
     ... 
    } 
} 

メインスレッドは、各スレッドのループを通って一度この関数を呼び出すと、私は確信して私は、スレッドセーフなマナーでqueueにアクセスしていますが、私はまた、可能な場合は、パフォーマンス上の理由からqueueをロックしないようにしたいようにしたいです。

+2

あなたは[system.collections.concurrent](https://msdn.microsoft.com/en-us/library/system.collections.concurrent(V = vs.110)から何かをしたいだろうとします。 aspx)。 – Equalsk

+1

私は明示的なキュー管理を避け、代わりにTPL Dataflowライブラリを見ていきます。 –

答えて

1

BlockingCollection<T>は、デフォルトではConcurrentQueue<T>に基づいています。キューはforeachのスレッドをブロックして、すぐにアイテムが利用可能になると、ブロック解除されます空である場合には、キューの外にアイテムを取得するには、.GetConsumingEnumerable()

foreachの内側から
public BlockingCollection<Item> queue = new BlockingCollection<Item>(); 

public void LoadItems() 
{ 
    var(var item in SomeDataSource()) 
    { 
     queue.Add(item); 
    } 
    queue.CompleteAdding(); 
} 

public void ConsumeItems() 
{ 
    foreach(var item in queue.GetConsumingEnumerable()) 
    { 
     ... 
    } 
} 

を使用します。 .CompleteAdding()が呼び出されると、foreachはキュー内のアイテムの処理を終了しますが、一度空になるとforeachブロックを終了します。

ただし、これを行う前に、TPL Dataflowを参照することをお勧めします。キューやスレッドをこれ以上管理する必要はありません。これにより、ロジックのチェーンを構築することができ、チェーン内の各ブロックに別々のレベルの並行性を持たせることができます。

public Task ProcessDataAsync(IEnumerable<SomeInput> input) 
{ 
    using(var outfile = new File.OpenWrite("outfile.txt")) 
    { 
     //Create a convert action that uses the number of processors on the machine to create parallel blocks for processing. 
     var convertBlock = new TransformBlock<SomeInput, string>(x => CpuIntensiveConversion(x), new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = Enviorment.ProcessorCount}); 

     //Create a single threaded action that writes out to the textwriter. 
     var writeBlock = new ActionBlock<string>(x => outfile.WriteLine(x)) 

     //Link the convert block to the write block. 
     convertBlock.LinkTo(writeBlock, new DataflowLinkOptions{PropagateCompletion = true}); 

     //Add items to the convert block's queue. 
     foreach(var item in input) 
     { 
       await convertBlock.SendAsync(); 
     } 

     //Tell the convert block we are done adding. This will tell the write block it is done processing once all items are processed. 
     convertBlock.Complete(); 

     //Wait for the write to finish writing out to the file; 
     await writeBlock.Completion; 
    } 
} 
+0

私は後ろに私の例を書いた。 1人のプロデューサー、多くの消費者。解決策は変わりません。例だけが必要です。 –

関連する問題