8

優先度を実装したいActionBlock<T>。だから私はPredicate<T>を使って条件付きでいくつかのTInputアイテムに優先順位をつけることができます。
私は Parallel Extensions Extras SamplesGuide to Implementing Custom TPL Dataflow Blocksを読みました。
しかし、このシナリオをどのように実装すればよいのか分かりません。
---------------------------- EDIT ------------------- --------
いくつかのタスクがあり、それらのタスクのうち5つは同時に実行できます。ユーザーがボタンを押すと、(述語関数に依存する)タスクの中で最も優先度が高いものが実行されます。
は、実際に私はこのコードActionBlockをカスタマイズする<T>

TaskScheduler taskSchedulerHighPriority; 
ActionBlock<CustomObject> actionBlockLow; 
ActionBlock<CustomObject> actionBlockHigh; 
... 
queuedTaskScheduler = new QueuedTaskScheduler(TaskScheduler.Default, 5); 
taskSchedulerHigh = queuedTaskScheduler.ActivateNewQueue(0); 
taskSchedulerLow = queuedTaskScheduler.ActivateNewQueue(1); 
... 
actionBlockHigh = new ActionBlock<CustomObject>(new Action<CustomObject>(method), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, SingleProducerConstrained = false, TaskScheduler = taskSchedulerHigh }); 
actionBlockLow = new ActionBlock<CustomObject>(new Action<CustomObject>(method), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, MaxMessagesPerTask = 1, TaskScheduler = taskSchedulerLow }); 
...  
if (predicate(customObject)) 
    actionBlockHigh.Post(customObject); 
else 
    actionBlockLow.Post(customObject); 

を書くしかし、優先順位がすべてで、影響かかりませんようです。
---------------------------- EDIT ------------------
私はこのコード行を使用しているという事実を見つける:

actionBlockHigh = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { TaskScheduler = taskSchedulerHigh }); 
actionBlockLow = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { TaskScheduler = taskSchedulerLow }); 

原因アプリケーションが正しくタスクの優先順位を観察したが1つのタスクのみがその間に示されている最初のコードブロックを使用して、一度に実行することができますアプリケーションが5つのタスクを同時に実行しますが、不適切な優先順位で実行されます。

actionBlockHigh = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, TaskScheduler = taskSchedulerHigh }); 
actionBlockLow = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, TaskScheduler = taskSchedulerLow }); 

更新:svickする
タンク、私はtaskSchedulerLowためMaxMessagesPerTaskを指定する必要があります。

+2

優先順位を決定するのは何ですか?それは 'T 'とはまったく関係ないものですか?あるいは、優先度は 'T 'の固有/派生プロパティですか? – casperOne

+0

ConcurrentPriorityQueueを使用するカスタムバッファブロックを作成することも、カスタム非同期トランスフォーメーションブロックを作成することもできます。両方のオプションは自明ではありません。 @casperOneにも同意します。あなたの場合、優先順位はどういう意味ですか? –

答えて

7

あなたの質問には多くの詳細が含まれていないので、以下はあなたが必要と思われるものの推測です。

これを行う最も簡単な方法は、QueuedTaskScheduler from ParallelExtensionsExtrasで異なる優先度で実行している2つのActionBlockを持つことです。述語を使用して優先度の高いものにリンクし、次に優先度の低いものにリンクします。また、優先度が高いTaskが待機していないことを確認するには、低優先度ブロックのMaxMessagesPerTaskを設定します。

これはあなたが何ができるかの単なるスケッチである
static ITargetBlock<T> CreatePrioritizedActionBlock<T>(
    Action<T> action, Predicate<T> isPrioritizedPredicate) 
{ 
    var buffer = new BufferBlock<T>(); 

    var scheduler = new QueuedTaskScheduler(1); 

    var highPriorityScheduler = scheduler.ActivateNewQueue(0); 
    var lowPriorityScheduler = scheduler.ActivateNewQueue(1); 

    var highPriorityBlock = new ActionBlock<T>(
     action, new ExecutionDataflowBlockOptions 
     { 
      TaskScheduler = highPriorityScheduler 
     }); 
    var lowPriorityBlock = new ActionBlock<T>(
     action, new ExecutionDataflowBlockOptions 
     { 
      TaskScheduler = lowPriorityScheduler, 
      MaxMessagesPerTask = 1 
     }); 

    buffer.LinkTo(highPriorityBlock, isPrioritizedPredicate); 
    buffer.LinkTo(lowPriorityBlock); 

    return buffer; 
} 

、例えば、返されたブロックのCompletionが正しく動作しません:

コードでは、それは次のようになります。

+0

編集済みのタグ – Rzassar

+1

を読んでください。あなたのコードでは、優先度の低いブロックに 'MaxMessagesPerTask'を指定しません。私が言ったように、それをすることは非常に重要なことです。 – svick

関連する問題