私はTPL Dataflowをプロダクションコードに移植する前に実験しています。 プロダクションコードは古典的なプロデューサ/コンシューマシステムです。プロデューサは(金融ドメインに関連して)メッセージを生成し、コンシューマはそれらのメッセージを処理します。TPLデータフロー - 非常に速いプロデューサー、非常に高速なコンシューマーOutOfMemory例外
私が興味を持っているのは、ある時点でプロデューサーが消費者よりもはるかに速く生産するならば、安定した環境が維持されることです(システムが爆発する、または何が起こるか)&もっと重要なことそれらの場合に行う。
同様の単純なアプリケーションを作成しようとすると、次のように思い付きます。
var bufferBlock = new BufferBlock<Item>();
var executiondataflowBlockOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount
,
BoundedCapacity = 100000
};
var dataFlowLinkOptions = new DataflowLinkOptions
{
PropagateCompletion = true
};
var actionBlock1 = new ActionBlock<Item>(t => ProcessItem(t),
executiondataflowBlockOptions);
bufferBlock.LinkTo(actionBlock1, dataFlowLinkOptions);
for (int i = 0; i < int.MaxValue; i++)
{
bufferBlock.SendAsync(GenerateItem());
}
bufferBlock.Complete();
Console.ReadLine();
Item
は、単にニュースがアップItem
今
static Item GenerateItem()
{
return new Item(Guid.NewGuid().ToString());
}
、模倣する非常に単純なクラス
internal class Item
{
public Item(string itemId)
{
ItemId = itemId;
}
public string ItemId { get; }
}
GenerateItem
ある消費者はそれほど速くない - 私は100ms
のために保持するためにProcessItem
を作りました。
static async Task ProcessItem(Item item)
{
await Task.Delay(TimeSpan.FromMilliseconds(100));
Console.WriteLine($"Processing #{item.ItemId} item.");
}
これを実行すると、20分ほどでOOM例外が発生します。
次に、より多くの消費者(より多くのActionBlocksを10個)を追加しました。これはもう少し時間がかかりますが、最終的には同じOOM例外が発生します。
は、私はまた、GCが(VS 2015人の診断ツールは、GCは、ほぼすべての時間を実行していることを示す)の巨大な圧力下にあることに気づいたので、私はItem
のオブジェクトプーリング(非常にシンプルなもの、基本的にそれはConcurrentBagが保存される項目)を導入したが、それでも私は同じ壁に当たっています(OOM例外がスローされます)。
何がメモリ上にあるのかについていくつかの詳細を示すには、メモリが不足している理由を説明してください。
- 最大の大きさは、私が
BufferBlock
さんINPUTBUFFERがActionBlock
(S)のために私のセットアップBoundedCapacity
のでItem
S(カウント= 14562296) - の完全であることがわかり
SingleProducerSingleConsumerQueue+Segment<TplDataFlow.Item>
&ConcurrentQueue+Segment<TplDataFlow.Item>
- 型のオブジェクトを持って、その入力バッファも設定された番号に近いです(InputCount = 99,996)
より遅いプロデューサが消費者のために可能になるようにするにはを追いつく、私は繰り返しの間スリープするプロデューサー製:
for (int i = 0; i < int.MaxValue; i++)
{
Thread.Sleep(TimeSpan.FromMilliseconds(50));
bufferBlock.SendAsync(GenerateItem());
}
を、それが正常に動作します - 例外がスローされていない、メモリの使用率が常に低いですが、私はもはや任意のGCの圧力が表示されません。TPLデータフロービルディングブロック
だから私は持っているいくつかの質問
- は、私がどのような方法にはあり、本質的に何も悪いことをしていますこの作業を行い、OOM例外で失敗しないようにします。
- TPLデータフローコンテキスト内でこの種のシナリオ(非常に高速なプロデューサ/低速コンシューマ)を処理する方法に関するベストプラクティスに関するコメント/リンク。消費者が追いつくことができないので、
BufferBlock
の内部バッファは、非常に高速なメッセージで埋めなっており、消費者の一部は、として次のメッセージをお願いして戻ってくるまでメッセージをオフに保持している - 問題の - 私の理解では、結果アプリケーションのメモリが不足しています(内部バッファーがいっぱいになっているので
BufferBlock
) - これに同意しますか?
私はMicrosoft.Tpl.Dataflow
パッケージ-version 4.5.24を使用しています。 .NET 4.5(C#6)。プロセスは32ビットです。
確認してください!あなたはBufferBlockの 'BoundedCapacity'の動作についてコメントしますか?特に内部バッファがいっぱいになると、着信メッセージで何が起こるのでしょうか?彼らは落ちるだろうか? (現時点でそうであるように思われる)。あなたはこれをあなたの本スティーブンの詳細で詳しく説明していますか? – Michael
@Michael:メッセージは決して落とされません。メッセージをフルブロックに 'Post'すると' false'が返されます(メッセージが受け入れられなかったことを示します)。完全なブロックへの 'SendAsync'メッセージを待っている場合、それは(非同期的に)部屋があるのを待ってからメッセージを投稿します。 [この回答](http://stackoverflow.com/a/13605979/263693)を参照してください。 –
ありがとう@StephenCleary! – Michael