2016-07-12 27 views
0

アイテムのリスト(200k〜300k)を処理していますが、各アイテムの処理時間は2〜8秒です。時間を得るために、私はこのリストを並行して処理することができます。私は非同期コンテキストにいるように、私はこのようなものを使用します。C#Parallel Foreach + Async

public async Task<List<Keyword>> DoWord(List<string> keyword) 
{ 
    ConcurrentBag<Keyword> keywordResults = new ConcurrentBag<Keyword>(); 
    if (keyword.Count > 0) 
    { 
     try 
     { 
      var tasks = keyword.Select(async kw => 
      { 
       return await Work(kw).ConfigureAwait(false); 
      }); 

      keywordResults = new ConcurrentBag<Keyword>(await Task.WhenAll(tasks).ConfigureAwait(false)); 
     } 
     catch (AggregateException ae) 
     { 
      foreach (Exception innerEx in ae.InnerExceptions) 
      { 
       log.ErrorFormat("Core threads exception: {0}", innerEx); 
      } 
     } 
    } 
    return keywordResults.ToList(); 
} 

キーワードリストは常に8つの要素(上から来)が含まれていので、私は8で私のリスト8を処理しますが、この場合には、私は推測します7つのキーワードが3秒で処理され、8番目のキーワードが10秒で処理される場合、8つのキーワードの合計時間は10(私が間違っている場合は正しい)になります。 どうすればParallel.Foreachに近づけることができますか?つまり、1つが実行されている場合は8個のキーワードを起動し、1個を起動するとします。この場合、私は8つの作業プロセスを永久に持っていきます。何か案が ?

+0

は、あなたが使用して考えられている[ 'TPL DataFlow'](https://msdn.microsoft.com/en-us /library/hh228603(v=vs.110).aspx)アイテムを処理するパイプラインを設定するには? –

+0

これはあなたが探しているようですね。https://msdn.microsoft.com/en-us/library/system.threading.tasks.paralleloptions.maxdegreeofparallelism(v=vs.110).aspx –

+0

@MatthewWatson、私はちょうどそれが存在を学んだ、私はこれを確認します、ありがとう! – Gun

答えて

2

TPL Dataflowを使用してこれに近づける方法を示すサンプルコードがあります。

これをコンパイルするには、TPLデータフローをNuGet経由でプロジェクトに追加する必要があります。

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Threading.Tasks; 
using System.Threading.Tasks.Dataflow; 

namespace Demo 
{ 
    class Keyword // Dummy test class. 
    { 
     public string Name; 
    } 

    class Program 
    { 
     static void Main() 
     { 
      // Dummy test data. 
      var keywords = Enumerable.Range(1, 100).Select(n => n.ToString()).ToList(); 

      var result = DoWork(keywords).Result; 

      Console.WriteLine("---------------------------------"); 

      foreach (var item in result) 
       Console.WriteLine(item.Name); 
     } 

     public static async Task<List<Keyword>> DoWork(List<string> keywords) 
     { 
      var input = new TransformBlock<string, Keyword> 
      (
       async s => await Work(s), 
       // This is where you specify the max number of threads to use. 
       new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 } 
      ); 

      var result = new List<Keyword>(); 

      var output = new ActionBlock<Keyword> 
      (
       item => result.Add(item), // Output only 1 item at a time, because 'result.Add()' is not threadsafe. 
       new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 } 
      ); 

      input.LinkTo(output, new DataflowLinkOptions { PropagateCompletion = true }); 

      foreach (string s in keywords) 
       await input.SendAsync(s); 

      input.Complete(); 
      await output.Completion; 

      return result; 
     } 

     public static async Task<Keyword> Work(string s) // Stubbed test method. 
     { 
      Console.WriteLine("Processing " + s); 

      int delay; 
      lock (rng) { delay = rng.Next(10, 1000); } 
      await Task.Delay(delay); // Simulate load. 

      Console.WriteLine("Completed " + s); 
      return await Task.Run(() => new Keyword { Name = s }); 
     } 

     static Random rng = new Random(); 
    } 
} 
1

これを行うための別のより簡単な方法を使用することですAsyncEnumerator NuGet Package

using System.Collections.Async; 

public async Task<List<Keyword>> DoWord(List<string> keywords) 
{ 
    var keywordResults = new ConcurrentBag<Keyword>(); 
    await keywords.ParallelForEachAsync(async keyword => 
    { 
     try 
     { 
      var result = await Work(keyword); 
      keywordResults.Add(result); 
     } 
     catch (AggregateException ae) 
     { 
      foreach (Exception innerEx in ae.InnerExceptions) 
      { 
       log.ErrorFormat("Core threads exception: {0}", innerEx); 
      } 
     } 
    }, maxDegreeOfParallelism: 8); 
    return keywordResults.ToList(); 
}