-5

私は500.000.000行のファイルを持っています。c#マルチスレッド処理100個のバッチファイルに大きなファイル行

行は最大10文字の文字列です。

マルチスレッドと100のバッチを使用してこのファイルを処理するにはどうすればよいですか?

+0

、顔をしていますここhttp://stackoverflow.com/a/13731823/5062791 – ColinM

+0

http://cc.davelozinski.com/c-sharp/the-fastest-way-to-read-and-process-text-files –

+1

コードを書くそれらを比較してください.....または良い文書を読んでください。良い質問をする方法... –

答えて

1

は必要ありません。あなたのコードは次のようになります。

using (var input = new StreamReader(File.OpenRead(@"c:\path\to\my\file.txt"))) 
{ 
    Parallel.ForEach(
     input.ReadLines().TakeChunks(100), 
     new ParallelOptions() { MaxDegreeOfParallelism = 8 /* better be number of CPU cores */ }, 
     batchOfLines => { 
      DoMyProcessing(batchOfLines); 
     }); 
} 

これが機能するために、あなたは、IEnumerable<T>上の拡張メソッドのカップルと列挙子のカップルを必要とする定義された次のようにバッチの

public static class EnumerableExtensions 
{ 
    public static IEnumerable<string> ReadLines(this StreamReader input) 
    { 
     return new LineReadingEnumerable(input); 
    } 

    public static IEnumerable<IReadOnlyList<T>> TakeChunks<T>(this IEnumerable<T> source, int length) 
    { 
     return new ChunkingEnumerable<T>(source, length); 
    } 

    public class LineReadingEnumerable : IEnumerable<string> 
    { 
     private readonly StreamReader _input; 

     public LineReadingEnumerable(StreamReader input) 
     { 
      _input = input; 
     } 
     public IEnumerator<string> GetEnumerator() 
     { 
      return new LineReadingEnumerator(_input); 
     } 
     IEnumerator IEnumerable.GetEnumerator() 
     { 
      return GetEnumerator(); 
     } 
    } 

    public class LineReadingEnumerator : IEnumerator<string> 
    { 
     private readonly StreamReader _input; 
     private string _current; 

     public LineReadingEnumerator(StreamReader input) 
     { 
      _input = input; 
     } 
     public void Dispose() 
     { 
      _input.Dispose(); 
     } 
     public bool MoveNext() 
     { 
      _current = _input.ReadLine(); 
      return (_current != null); 
     } 
     public void Reset() 
     { 
      throw new NotSupportedException(); 
     } 
     public string Current 
     { 
      get { return _current; } 
     } 
     object IEnumerator.Current 
     { 
      get { return _current; } 
     } 
    } 

    public class ChunkingEnumerable<T> : IEnumerable<IReadOnlyList<T>> 
    { 
     private readonly IEnumerable<T> _inner; 
     private readonly int _length; 

     public ChunkingEnumerable(IEnumerable<T> inner, int length) 
     { 
      _inner = inner; 
      _length = length; 
     } 
     public IEnumerator<IReadOnlyList<T>> GetEnumerator() 
     { 
      return new ChunkingEnumerator<T>(_inner.GetEnumerator(), _length); 
     } 
     IEnumerator IEnumerable.GetEnumerator() 
     { 
      return this.GetEnumerator(); 
     } 
    } 

    public class ChunkingEnumerator<T> : IEnumerator<IReadOnlyList<T>> 
    { 
     private readonly IEnumerator<T> _inner; 
     private readonly int _length; 
     private IReadOnlyList<T> _current; 
     private bool _endOfInner; 

     public ChunkingEnumerator(IEnumerator<T> inner, int length) 
     { 
      _inner = inner; 
      _length = length; 
     } 
     public void Dispose() 
     { 
      _inner.Dispose(); 
      _current = null; 
     } 
     public bool MoveNext() 
     { 
      var currentBuffer = new List<T>(); 

      while (currentBuffer.Count < _length && !_endOfInner) 
      { 
       if (!_inner.MoveNext()) 
       { 
        _endOfInner = true; 
        break; 
       } 

       currentBuffer.Add(_inner.Current); 
      } 

      if (currentBuffer.Count > 0) 
      { 
       _current = currentBuffer; 
       return true; 
      } 

      _current = null; 
      return false; 
     } 
     public void Reset() 
     { 
      _inner.Reset(); 
      _current = null; 
      _endOfInner = false; 
     } 
     public IReadOnlyList<T> Current 
     { 
      get 
      { 
       if (_current != null) 
       { 
        return _current; 
       } 

       throw new InvalidOperationException(); 
      } 
     } 
     object IEnumerator.Current 
     { 
      get 
      { 
       return this.Current; 
      } 
     } 
    } 
} 
+0

ここで重要なことは、ループが完了すると 'Parallel.ForEach'が返され、長時間実行されている操作でUIスレッドをブロックする可能性があるということです。これを超える方法の1つは、最初の呼び出しメソッドで新しいタスクをスピンオフすることです。 – ColinM

+0

何かが欠けていない限り、このチャンクの実装はスレッドセーフではありません。 –

+0

スレッドセーフではありません@エリックJ、それはする必要はありません。 Parallel.ForEachは並列スレッド間で列挙型を分割するときにスレッドの安全性を考慮します(System.Collections.Concurrent.PartitionerとそのネストされたクラスInternalPartitionEnumerableおよびInternalPartitionEnumeratorを参照)。 –

2

MoreLinqのBatchメソッドを使用すると、IEnumerable<string>のコレクションが作成され、100の行バッチサイズが含まれ、100行ごとに新しいタスクが回転します。

Semaphoreを使用すると、特定の時間に一定量のタスクのみを実行し、File.ReadAllLinesのパフォーマンスが500,000,000行のパフォーマンスにどのような影響を及ぼすかがわかります。あなたは組み込みのTPLからParallel.ForEachを使用して(下記参照)列挙子のカップルを書いた場合、追加のライブラリを使用して

public class FileProcessor 
{ 
    public async Task ProcessFile() 
    { 
     List<Task> tasks = new List<Task>(); 
     var lines = File.ReadAllLines("File.txt").Batch(100); 
     foreach (IEnumerable<string> linesBatch in lines) 
     { 
      IEnumerable<string> localLinesBatch = linesBatch; 
      Task task = Task.Factory.StartNew(() => 
      { 
       // Perform operation on localLinesBatch 
      }); 
      tasks.Add(task); 
     } 

     await Task.WhenAll(tasks); 
    } 
} 

public static class LinqExtensions 
{ 
    public static IEnumerable<IEnumerable<TSource>> Batch<TSource>(
       this IEnumerable<TSource> source, int size) 
    { 
     TSource[] bucket = null; 
     var count = 0; 

     foreach (var item in source) 
     { 
      if (bucket == null) 
       bucket = new TSource[size]; 

      bucket[count++] = item; 
      if (count != size) 
       continue; 

      yield return bucket; 

      bucket = null; 
      count = 0; 
     } 

     if (bucket != null && count > 0) 
      yield return bucket.Take(count); 
    } 
} 
関連する問題