2012-06-27 33 views
8

順番に(各フローで)処理する必要のある複数のフローに依存するタスクをキューしたい。フローは並行して処理できます。スレッドプールによって処理される依存タスクをキューに入れるC#

具体的には、2つのキューが必要で、各キューのタスクを順番に処理したいとします。ここでは、目的の動作を説明するためのサンプル擬似コードである:ここでは

Queue1_WorkItem wi1a=...; 

enqueue wi1a; 

... time passes ... 

Queue1_WorkItem wi1b=...; 

enqueue wi1b; // This must be processed after processing of item wi1a is complete 

... time passes ... 

Queue2_WorkItem wi2a=...; 

enqueue wi2a; // This can be processed concurrently with the wi1a/wi1b 

... time passes ... 

Queue1_WorkItem wi1c=...; 

enqueue wi1c; // This must be processed after processing of item wi1b is complete 

は、作業項目間の依存関係を示す矢印が付いた図である。

enter image description here

の質問はどのように私はこのC#4.0を使用しないでくださいされて/ .NET 4.0?今私は2つのワーカースレッド、1つのキューに1つがあり、私は各キューに対してBlockingCollection<>を使用します。代わりに、.NETスレッドプールを活用して、ワーカースレッドが(フロー間で)アイテムを同時に処理するようにしたいが、フロー内では連続して処理する。言い換えれば、例えばwi1bがwi1aの完了に依存し、wi1bが到着したときにwi1aを完了して覚えておく必要がないことを示すことができます。言い換えれば、私は、「私はすでにキュー1のために提出した他のアイテムと連続的に処理されるが、他のキューに提出された作業アイテムと並行して処理されるべきである。

この説明が意味をなされることを望みます。コメントがない場合は、コメントで質問してください。私はこの質問を適宜更新します。

読んでいただきありがとうございます。

アップデート:ここで、これまでの「欠陥」ソリューションを要約する

は、私が使用することはできません解答セクションと、私はそれらを使用できない理由(複数可)からの解決策は以下のとおりです。

TPLタスクでは、ContinueWith()の先行タスクを指定する必要があります。私は、新しいタスクを提出するときに、各キューの先行タスクに関する知識を維持したくありません。

TDF ActionBlocksは有望ですが、ActionBlockに投稿されたアイテムは並行して処理されるようです。私は、特定のキューの項目を順次処理する必要があります。

アップデート2:

RE:ActionBlocks

1つにMaxDegreeOfParallelismオプションを設定すると、単一のActionBlockに提出された作業項目の並列処理を妨げるように思われます。したがって、待ち行列ごとにActionBlockを持つことで、MicrosoftからTDFライブラリをインストールして展開する必要があり、純粋な.NET 4.0ソリューションを望んでいたという唯一の欠点が私の問題を解決しているようです。これまでのところ、これは受験者が受け入れ可能な答えです。誰かが、キューごとにワーカースレッドに縮退しない純粋な.NET 4.0ソリューション(これは私がすでに使用しています)でこれを行う方法を理解できない限りです。

+0

Task/ContinueWithを見ましたか? –

+0

私は、ContinueWithが前のタスクの知識を必要としていることに気付きました。元の質問に指定されている先行タスクを追跡する必要はありません。その理由の1つは、キューごとに行う必要があるためです。代わりに、私は提出の時点から "発砲と忘却"し、タスク処理述語のエラーとエラーの伝播を処理したいと考えています。言い換えれば、私は提出時点で最小限の状態、つまり作業項目とそれを送信すべき待ち行列を望みます。 –

答えて

4

多くのキューがあり、スレッドを縛りたくないと思います。キューごとにActionBlockを設定できます。 ActionBlockは、必要なもののほとんどを自動化します。作業項目を順番に処理し、作業が保留中の場合にのみタスクを開始します。保留中の作業がない場合、タスク/スレッドはブロックされません。

+0

不幸にも、提供したリンクのドキュメントに基づいて、このソリューションにはまだリリースされていない.NET 4.5が必要です。私は、この質問に添付されたタグで指定されている.NET 4.0を活用するソリューションを探しています。 –

+0

TPL Dataflowは容易に入手できます(http://msdn.microsoft.com/en-us/devlabs/gg585582.aspx)。 – usr

+0

TPLデータフローをインストールし、http://msdn.microsoft.com/en-us/library/hh462696(v=vs.110).aspxでサンプルコードを試しました。アクションブロックに投稿されたアイテムは、あなたの返信に記載されているように、並行して処理され、シリアルに処理されないように見えます。これは私が望むものではありません。キューごとのアイテムは連続して処理する必要があります。 –

3

最も良い方法は、Task Parallel Library (TPL)Continuationsです。継続は、タスクの流れを作成するだけでなく、例外を処理することもできます。これはTPLへのgreat introductionです。しかし、あなたにいくつか考えを与える...先行タスクの終了(エラーまたは成功したが)あなたが

Task task1 = Task.Factory.StartNew(() => Console.WriteLine("Antecedant Task")); 
Task task2 = task1.ContinueWith(antTask => Console.WriteLine("Continuation...")); 

ようContinueWith方法を使用することができるとき

あなたは2番目のタスクを開始するために今すぐ

Task task = Task.Factory.StartNew(() => 
{ 
    // Do some work here... 
}); 

を使用してTPLタスクを開始することができますすぐにtask1が完了するか、失敗するか、キャンセルされます。task2が「火災」し、実行を開始します。 task1が2行目のコードtask2に到達する前に完了した場合、すぐに実行するようにスケジュールされていることに注意してください。 2番目のラムダに渡される引数antTaskは、先行タスクへの参照です。あなたはまた、先行タスク

Task.Factory.StartNew<int>(() => 1) 
    .ContinueWith(antTask => antTask.Result * 4) 
    .ContinueWith(antTask => antTask.Result * 4) 
    .ContinueWith(antTask =>Console.WriteLine(antTask.Result * 4)); // Prints 64. 

ノートからの継続の結果を渡すことができ、より詳細な例についてthis link ...

を参照してください。最初のリンクの例外処理については、TPLの迷子になる可能性があるので、必ず読んでください。

特に、あなたが望むものを見て最後に1つは、子タスクです。子タスクは、AttachedToParentとして作成されます。この場合、すべての子タスクが完了するまで継続は実行されません。

TaskCreationOptions atp = TaskCreationOptions.AttachedToParent; 
Task.Factory.StartNew(() => 
{ 
    Task.Factory.StartNew(() => { SomeMethod() }, atp); 
    Task.Factory.StartNew(() => { SomeOtherMethod() }, atp); 
}).ContinueWith(cont => { Console.WriteLine("Finished!") }); 

私はこれが役立つことを望みます。

編集:ConcurrentCollections、特にBlockngCollection<T>をご覧ください。だからあなたの場合には、あなたがすでに持っているデザインが良いと働いているように見えます

public class TaskQueue : IDisposable 
{ 
    BlockingCollection<Action> taskX = new BlockingCollection<Action>(); 

    public TaskQueue(int taskCount) 
    { 
     // Create and start new Task for each consumer. 
     for (int i = 0; i < taskCount; i++) 
      Task.Factory.StartNew(Consumer); 
    } 

    public void Dispose() { taskX.CompleteAdding(); } 

    public void EnqueueTask (Action action) { taskX.Add(Action); } 

    void Consumer() 
    { 
     // This seq. that we are enumerating will BLOCK when no elements 
     // are avalible and will end when CompleteAdding is called. 
     foreach (Action action in taskX.GetConsumingEnumerable()) 
      action(); // Perform your task. 
    } 
} 
+0

ここでの欠陥は、私がtask1のメモリをtask2が送信された時点で保持したくないということです。私はちょうど私が最後に提出した項目の後に、それが何であっても、task2がqueue1に提出されるべきだと言いたい。したがって、次のタスクの提出時点で先行タスク(タスク1)を把握しておらず、また追跡したくない(タスク2)。 –

+0

私が追加した子タスクのセクションを見てください。これはあなたが必要とするものを達成するのに役立ちます。すべてのベスト... – MoonKnight

+0

'SomeMethod()'と 'SomeOtherMethod()'は連続して実行されますか?私の必要性のために、彼らは連続して実行されなければならない。また、私はすべてのタスクを同時に利用できるわけではありません。作業項目が入ってきてスケジュールする必要があるので、複数の項目で 'StartNew()'を実行することはできません。 –

0

のようなものを使用することがあります。ワーカースレッド(キューごとに1つ)は長時間実行されるため、タスクを代わりに使用する場合は​​と指定して、専用のワーカースレッドを取得します。

しかし、ここでThreadPoolを使用する必要はありません。長時間の作業には多くの利点はありません。

+1

スレッドは長時間実行されていますが、タスク自体は短いです。また、これを多くのキューにスケールすると、すべてのキューが同時に処理されるため、オーバーサブスクリプションが発生する可能性があるため、分解されます。スレッドプールでは、これを心配する必要はありません。パラレルキュー内のタスクは、最小のコンテキスト切り替えでスレッドプールによって並列に処理できるためです。 –

1

親タスクをどこかに格納する必要があるという事実を隠しながら、TPLベースの.NET 4.0ソリューションが可能です。例:

class QueuePool 
{ 
    private readonly Task[] _queues; 

    public QueuePool(int queueCount) 
    { _queues = new Task[queueCount]; } 

    public void Enqueue(int queueIndex, Action action) 
    { 
     lock (_queues) 
     { 
      var parent = _queue[queueIndex]; 
      if (parent == null) 
       _queues[queueIndex] = Task.Factory.StartNew(action); 
      else 
       _queues[queueIndex] = parent.ContinueWith(_ => action()); 
     } 
    } 
} 

これは、アイデアを説明するために、すべてのキューに単一のロックを使用しています。ただし、プロダクションコードでは、競合を減らすためにキューごとにロックを使用します。

+0

親タスクが既に完了している場合、ContinueWithはロック内のアクションを開始するため、これは良い解決策ではありません。 –

関連する問題