2013-02-06 11 views
11

私はいくつかのコードを書くことを試みています。これは、いくつかの異なるサーバーへのWebサービス呼び出しを並行して実行するため、TPLが当然の選択肢のようです。タスク並列ライブラリ指定された結果を持つWaitAny

私のWebサービスコールのうちの1つだけが、私が望む結果を返すだけで、他のものはそうしないでしょう。私は効果的にTask.WaitAnyを持っているが、条件に合致する最初のTaskが返ってきたら、ブロックを解除するしか方法を見つけようとしていません。

私はWaitAnyで試してみましたが、フィルタを配置する場所が分からなかった。私はこれまでのところ得た:

public void SearchServers() 
{ 
    var servers = new[] {"server1", "server2", "server3", "server4"}; 
    var tasks = servers 
       .Select(s => Task<bool>.Factory.StartNew(server => CallServer((string)server), s)) 
       .ToArray(); 

    Task.WaitAny(tasks); //how do I say "WaitAny where the result is true"? 

    //Omitted: cancel any outstanding tasks since the correct server has been found 
} 

private bool CallServer(string server) 
{ 
    //... make the call to the server and return the result ... 
} 

編集:クイック明確化を念のために上記のいずれかの混乱があります。私は、次の操作を実行しようとしている:

  1. サーバーごとに、サーバーがtrueを返すまで待って、
  2. のいずれかを、それをチェックするためにTaskを開始(1台のサーバーの唯一最大が今までtrueを返します)
  3. または、すべてのサーバーがfalseを返すまで待ち​​ます。つまり、一致するものはありません。

答えて

9

私は、各TaskためContinueWithを指定して、結果をチェックしていると考えることができるものの最高の、そしてtrue場合は他のタスクをキャンセルします。タスクをキャンセルするには、CancellationTokenを使用します。

var tasks = servers 
    .Select(s => Task.Run(...) 
     .ContinueWith(t => 
      if (t.Result) { 
       // cancel other threads 
      } 
     ) 
    ).ToArray(); 

UPDATE:右のタスクが完了するまで、代替ソリューションはWaitAnyになります(しかし、それはいくつかの欠点があり、例えばリストから、完成したタスクを削除し、残りのものから新しい配列を作成するには、かなり重いです操作):

List<Task<bool>> tasks = servers.Select(s => Task<bool>.Factory.StartNew(server => CallServer((string)server), s)).ToList(); 

bool result; 
do { 
    int idx = Task.WaitAny(tasks.ToArray()); 
    result = tasks[idx].Result; 
    tasks.RemoveAt(idx); 
} while (!result && tasks.Count > 0); 

// cancel other tasks 

UPDATE 2:今日で私はRxのでそれを行うだろう:

[Fact] 
public async Task AwaitFirst() 
{ 
    var servers = new[] { "server1", "server2", "server3", "server4" }; 
    var server = await servers 
     .Select(s => Observable 
      .FromAsync(ct => CallServer(s, ct)) 
      .Where(p => p) 
      .Select(_ => s) 
     ) 
     .Merge() 
     .FirstAsync(); 
    output.WriteLine($"Got result from {server}"); 
} 

private async Task<bool> CallServer(string server, CancellationToken ct) 
{ 
    try 
    { 
     if (server == "server1") 
     { 
      await Task.Delay(TimeSpan.FromSeconds(1), ct); 
      output.WriteLine($"{server} finished"); 
      return false; 
     } 
     if (server == "server2") 
     { 
      await Task.Delay(TimeSpan.FromSeconds(2), ct); 
      output.WriteLine($"{server} finished"); 
      return false; 
     } 
     if (server == "server3") 
     { 
      await Task.Delay(TimeSpan.FromSeconds(3), ct); 
      output.WriteLine($"{server} finished"); 
      return true; 
     } 
     if (server == "server4") 
     { 
      await Task.Delay(TimeSpan.FromSeconds(4), ct); 
      output.WriteLine($"{server} finished"); 
      return true; 
     } 
    } 
    catch(OperationCanceledException) 
    { 
     output.WriteLine($"{server} Cancelled"); 
     throw; 
    } 

    throw new ArgumentOutOfRangeException(nameof(server)); 
} 

テストでは、私のマシン(THAに3.32秒かかりますInterlocked.CompareExchangeはまさにそれを行います使用

server1 finished 
server2 finished 
server3 finished 
server4 Cancelled 
Got result from server3 
+0

私はあなたのコードサンプルを試してきました。私は 'WaitAny'を使うことはできません。なぜなら、サーバが正しいものではなくても、最初の' Task'が完了したときに戻ってくるからです。私も 'WaitAll'を使うことはできません。さもなければ、正しいものを見つけた後でも、すべてのタスクが完了するのを待たなければなりません。理想的には、「いずれかのタスクが真、またはすべてのタスクが完了するまで待ちます(つまり、いずれのサーバーも一致しない)」。私がそれを達成する方法はありますか? –

+0

私は 'WaitAll'はキャンセルされた仕事でも動作すると思います。 –

+0

私の答えは更新されましたが、今は@svickの答えが好きです。 –

1

、1つのタスクのみがserverReturedData

public void SearchServers() 
     { 
      ResultClass serverReturnedData = null; 
      var servers = new[] {"server1", "server2", "server3", "server4"}; 
      var tasks = servers.Select(s => Task<bool>.Factory.StartNew(server => 
      { 
       var result = CallServer((string)server), s); 
       Interlocked.CompareExchange(ref serverReturnedData, result, null); 

      }).ToArray(); 

      Task.WaitAny(tasks); //how do I say "WaitAny where the result is true"? 
     // 
     // use serverReturnedData as you want. 
     // 
     } 
を書くことができるようになります:tはそれが第四サーバーを待たなかった)、私は次の出力を得た意味します

編集:Jasd氏によると、上記のコードは変数serverReturnedDataが有効な値(サーバがnull値を返した場合、これが起きる可能性がある)の前に戻って、カスタムオブジェクトに結果をラップできることを保証します。

+0

'Task.WaitAny(tasks);'(serverReturnedData')は変更できます(完了する他のタスクのため)。さらに、完了する最初のタスクは、「真」を返すタスクであることが保証されていません。 –

+0

上記のコードは、返される最初のタスクがnull値を返した場合、ローカル変数に最初の非NULL値が格納されることを保証します。しかし、これは、カスタムオブジェクトに結果をラップすることで簡単に解決できます。 – DVD

+0

したがって、まずboolは値型でnullにはできません。そのため、Interlocked.CompareExchangeの初期状態と第3引数をfalseに変更したい場合があります。さらに、(Task.WaitAny(tasks);で待っている)最初のタスクは 'true'を返すタスクであるとは限りません。しかし、IMOは、「真実」を返す最初の仕事を待つことを望んでいる。 –

4

OrderByCompletion() from the AsyncEx libraryを使用すると、完了したタスクが返されます。ProxifyByCompletionは次のように実装されて

public static async Task<T> GetFirstResult<T>(
this IEnumerable<Func<CancellationToken, Task<T>>> taskFactories, 
Action<Exception> exceptionHandler, 
Predicate<T> predicate) 
{ 
    T ret = default(T); 
    var cts = new CancellationTokenSource(); 
    var proxified = taskFactories.Select(tf => tf(cts.Token)).ProxifyByCompletion(); 
    int i; 
    for (i = 0; i < proxified.Length; i++) 
    { 
     try 
     { 
      ret = await proxified[i].ConfigureAwait(false); 
     } 
     catch (Exception e) 
     { 
      exceptionHandler(e); 
      continue; 
     } 
     if (predicate(ret)) 
     { 
      break; 
     } 
    } 

    if (i == proxified.Length) 
    { 
     throw new InvalidOperationException("No task returned the expected value"); 
    } 
    cts.Cancel(); //we have our value, so we can cancel the rest of the tasks 
    for (int j = i+1; j < proxified.Length; j++) 
    { 
     //observe remaining tasks to prevent process crash 
     proxified[j].ContinueWith(
     t => exceptionHandler(t.Exception), TaskContinuationOptions.OnlyOnFaulted) 
        .Forget(); 
    } 
    return ret; 
} 

public static Task<T>[] ProxifyByCompletion<T>(this IEnumerable<Task<T>> tasks) 
{ 
    var inputTasks = tasks.ToArray(); 
    var buckets = new TaskCompletionSource<T>[inputTasks.Length]; 
    var results = new Task<T>[inputTasks.Length]; 
    for (int i = 0; i < buckets.Length; i++) 
    { 
     buckets[i] = new TaskCompletionSource<T>(); 
     results[i] = buckets[i].Task; 
    } 
    int nextTaskIndex = -1; 
    foreach (var inputTask in inputTasks) 
    { 
     inputTask.ContinueWith(completed => 
     { 
      var bucket = buckets[Interlocked.Increment(ref nextTaskIndex)]; 
      if (completed.IsFaulted) 
      { 
       Trace.Assert(completed.Exception != null); 
       bucket.TrySetException(completed.Exception.InnerExceptions); 
      } 
      else if (completed.IsCanceled) 
      { 
       bucket.TrySetCanceled(); 
      } 
      else 
      { 
       bucket.TrySetResult(completed.Result); 
      } 
     }, CancellationToken.None, 
      TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); 
    } 
    return results; 
} 

そしてForgetはに空の方法であり、一般的なソリューションはsvickの回答に基づいています。ここ

var tasks = servers 
    .Select(s => Task.Factory.StartNew(server => CallServer((string)server), s)) 
    .OrderByCompletion(); 

foreach (var task in tasks) 
{ 
    if (task.Result) 
    { 
     Console.WriteLine("found"); 
     break; 
    } 
    Console.WriteLine("not found yet"); 
} 

// cancel any outstanding tasks since the correct server has been found 
+1

'task.Result'は次のタスクが完了するまで現在のスレッドをブロックしていませんか?もし現在のスレッドが、例えば、次のようなものであれば、これは別のスレッドでも行わなければならないかもしれません。 UIスレッド? –

+1

@ジャスドはい、そうです。しかし、問題は改善された 'WaitAny()'を要求します。だから、私はこれがUIアプリケーションではない、またはすでに別のスレッドで実行されていると仮定しています。 – svick

+0

ああ、私は、thxを参照してください。 –

1

:あなたのコードのようなものを見ることができます抑制するCS4014:

public static void Forget(this Task task) //suppress CS4014 
{ 
} 
関連する問題