2016-10-31 7 views
2

私はTPL Dataflowをプロダクションコードに移植する前に実験しています。 プロダクションコードは古典的なプロデューサ/コンシューマシステムです。プロデューサは(金融ドメインに関連して)メッセージを生成し、コンシューマはそれらのメッセージを処理します。TPLデータフロー - 非常に速いプロデューサー、非常に高速なコンシューマーOutOfMemory例外

私が興味を持っているのは、ある時点でプロデューサーが消費者よりもはるかに速く生産するならば、安定した環境が維持されることです(システムが爆発する、または何が起こるか)&もっと重要なことそれらの場合に行う。

同様の単純なアプリケーションを作成しようとすると、次のように思い付きます。

var bufferBlock = new BufferBlock<Item>(); 

    var executiondataflowBlockOptions = new ExecutionDataflowBlockOptions 
         { 
          MaxDegreeOfParallelism = Environment.ProcessorCount 
          , 
          BoundedCapacity = 100000 
         }; 

     var dataFlowLinkOptions = new DataflowLinkOptions 
         { 
          PropagateCompletion = true 
         }; 

     var actionBlock1 = new ActionBlock<Item>(t => ProcessItem(t), 
executiondataflowBlockOptions); 

      bufferBlock.LinkTo(actionBlock1, dataFlowLinkOptions); 
      for (int i = 0; i < int.MaxValue; i++) 
      { 
       bufferBlock.SendAsync(GenerateItem()); 
      } 

      bufferBlock.Complete(); 
      Console.ReadLine(); 

Itemは、単にニュースがアップItem

static Item GenerateItem() 
{ 
    return new Item(Guid.NewGuid().ToString()); 
} 

、模倣する非常に単純なクラス

internal class Item 
    { 
     public Item(string itemId) 
     { 
      ItemId = itemId; 
     } 

     public string ItemId { get; } 
    } 

GenerateItemある消費者はそれほど速くない - 私は100msのために保持するためにProcessItemを作りました。

static async Task ProcessItem(Item item) 
{ 
    await Task.Delay(TimeSpan.FromMilliseconds(100)); 
    Console.WriteLine($"Processing #{item.ItemId} item."); 
} 

これを実行すると、20分ほどでOOM例外が発生します。

次に、より多くの消費者(より多くのActionBlocksを10個)を追加しました。これはもう少し時間がかかりますが、最終的には同じOOM例外が発生します。

は、私はまた、GCが(VS 2015人の診断ツールは、GCは、ほぼすべての時間を実行していることを示す)の巨大な圧力下にあることに気づいたので、私はItemのオブジェクトプーリング(非常にシンプルなもの、基本的にそれはConcurrentBagが保存される項目)を導入したが、それでも私は同じ壁に当たっています(OOM例外がスローされます)。

何がメモリ上にあるのかについていくつかの詳細を示すには、メモリが不足している理由を説明してください。

  • 最大の大きさは、私がBufferBlockさんINPUTBUFFERがActionBlock(S)のために私のセットアップBoundedCapacityのでItem S(カウント= 14562296)
  • の完全であることがわかりSingleProducerSingleConsumerQueue+Segment<TplDataFlow.Item> & ConcurrentQueue+Segment<TplDataFlow.Item>
  • 型のオブジェクトを持って、その入力バッファも設定された番号に近いです(InputCount = 99,996)

より遅いプロデューサが消費者のために可能になるようにするにはを追いつく、私は繰り返しの間スリープするプロデューサー製:

for (int i = 0; i < int.MaxValue; i++) 
{ 
    Thread.Sleep(TimeSpan.FromMilliseconds(50)); 
    bufferBlock.SendAsync(GenerateItem()); 
} 

を、それが正常に動作します - 例外がスローされていない、メモリの使用率が常に低いですが、私はもはや任意のGCの圧力が表示されません。TPLデータフロービルディングブロック

  • と非常に高速なプロデューサー/スロー消費者(複数可)のシナリオを再現しようとしているときに

    だから私は持っているいくつかの質問

    1. は、私がどのような方法にはあり、本質的に何も悪いことをしていますこの作業を行い、OOM例外で失敗しないようにします。
    2. TPLデータフローコンテキスト内でこの種のシナリオ(非常に高速なプロデューサ/低速コンシューマ)を処理する方法に関するベストプラクティスに関するコメント/リンク。消費者が追いつくことができないので、BufferBlockの内部バッファは、非常に高速なメッセージで埋めなっており、消費者の一部は、として次のメッセージをお願いして戻ってくるまでメッセージをオフに保持している - 問題の
    3. 私の理解では、結果アプリケーションのメモリが不足しています(内部バッファーがいっぱいになっているのでBufferBlock) - これに同意しますか?

    私はMicrosoft.Tpl.Dataflowパッケージ-version 4.5.24を使用しています。 .NET 4.5(C#6)。プロセスは32ビットです。

  • 答えて

    5

    この問題は、BufferBlockがOOMに当たるまで入力バッファを満たしていることがわかりました。

    この問題を解決するには、BoundedCapacityオプションをバッファブロックに追加する必要があります。これにより、プロデューサが自動的に調整されます(プロデューサのThread.Sleepは不要です)。

    +0

    確認してください!あなたはBufferBlockの 'BoundedCapacity'の動作についてコメントしますか?特に内部バッファがいっぱいになると、着信メッセージで何が起こるのでしょうか?彼らは落ちるだろうか? (現時点でそうであるように思われる)。あなたはこれをあなたの本スティーブンの詳細で詳しく説明していますか? – Michael

    +0

    @Michael:メッセージは決して落とされません。メッセージをフルブロックに 'Post'すると' false'が返されます(メッセージが受け入れられなかったことを示します)。完全なブロックへの 'SendAsync'メッセージを待っている場合、それは(非同期的に)部屋があるのを待ってからメッセージを投稿します。 [この回答](http://stackoverflow.com/a/13605979/263693)を参照してください。 –

    +0

    ありがとう@StephenCleary! – Michael

    関連する問題