2016-12-02 1 views
1

私はに新規です。TPL Dataflowと私は均等に分散されたソースメッセージのリストを分割できる構造を探しています個々のパイプラインを介してメッセージメッセージの順序を維持しながら並列処理を行う。これを達成するために使用できるDataFlow API内の特定のブロックまたはコンセプトがあるか、または既存のブロック間にグルーコードまたはカスタムブロックを提供することが重要ですか?TPLデータフローを使用してメッセージを一貫性のあるセットにルーティング、グループ化、または分割する方法

Akka.NETに慣れている人には、ConsistentHashing routerと似た機能を探しています。これらの機能は、単一のルータにメッセージを送信し、これらのメッセージを個々のルートに転送して処理されるようにします。

同期例:私はあなたが探しているものはないと思う一般的に

var count = 100000; 
var processingGroups = 5; 
var source = Enumerable.Range(1, count); 

// Distribute source elements consistently and evenly into a specified set of groups (ex. 5) so that. 
var distributed = source.GroupBy(s => s % processingGroups); 

// Within each of the 5 processing groups go through each item and add 1 to it 
var transformed = distributed.Select(d => d.Select(i => i + 3).ToArray()); 

List<int[]> result = transformed.ToList(); 
Check.That(result.Count).IsEqualTo(processingGroups); 
for (int i = 0; i < result.Count; i++) 
{ 
    var outputGroup = result[i]; 

    var expectedRange = Enumerable.Range(i + 1, count/processingGroups).Select((e, index) => e + (index * (processingGroups - 1)) + 3); 
    Check.That(outputGroup).ContainsExactly(expectedRange); 
} 

答えて

1

があらかじめ作られて、それはConsistentHashingルータとなり得るようなデータフローに。しかし、流したいデータにIDを追加することで、それらを並行して任意の順序で処理し、処理が完了したときに並べ替えることができます。

public class Message { 
     public int MessageId { get; set; } 
     public int GroupId { get; set; }   
     public int Value { get; set; } 
    } 

    public class MessageProcessing 
    { 
     public void abc() { 
      var count = 10000; 
      var groups = 5; 
      var source = Enumerable.Range(0, count); 

      //buffer all input 
      var buffer = new BufferBlock<IEnumerable<int>>(); 

      //split each input enumerable into processing groups 
      var messsageProducer = new TransformManyBlock<IEnumerable<int>, Message>(ints => 
      ints.Select((i, index) => new Message() { MessageId = index, GroupId = index % groups, Value = i }).ToList()); 

      //process each message, one action block may process any group id in any order 
      var processMessage = new TransformBlock<Message, Message>(msg => 
      { 
       msg.Value++; 
       return msg; 
      }, new ExecutionDataflowBlockOptions() { 
       MaxDegreeOfParallelism = groups 
      }); 

      //output of processed message values 
      int[] output = new int[count]; 

      //insert messages into array in the order the started in 
      var regroup = new ActionBlock<Message>(msg => output[msg.MessageId] = msg.Value, 
       new ExecutionDataflowBlockOptions() { 
        MaxDegreeOfParallelism = 1 
       }); 
     }   

    } 

この例では、メッセージのGroupIdは使用されませんが、メッセージのグループを調整するためのより完全な例で使用できます。また、バッファブロックへのフォローアップポストの処理は、出力配列をリストに変更し、整数の列挙がバッファブロックにポストされるたびに、対応するリスト要素を設定することによって行うことができます。正確な使用状況に応じて、出力の複数のユーザーをサポートする必要があり、これをフローに折り返すことができます。

0

あなたは動的述語に基づいてlinking the blocks between each otherとのパイプラインを作成することができます。

var count = 100; 
var processingGroups = 5; 
var source = Enumerable.Range(1, count); 

var buffer = new BufferBlock<int>(); 
var consumer1 = new ActionBlock<int>(i => { }); 
var consumer2 = new ActionBlock<int>(i => { }); 
var consumer3 = new ActionBlock<int>(i => { }); 
var consumer4 = new ActionBlock<int>(i => { Console.WriteLine(i); }); 
var consumer5 = new ActionBlock<int>(i => { }); 

buffer.LinkTo(consumer1, i => i % 5 == 1); 
buffer.LinkTo(consumer2, i => i % 5 == 2); 
buffer.LinkTo(consumer3, i => i % 5 == 3); 
buffer.LinkTo(consumer4, i => i % 5 == 4); 
buffer.LinkTo(consumer5); 

foreach (var i in source) 
{ 
    buffer.Post(i); 
    // consider async option if you able to do it 
    // await buffer.SendAsync(i); 
} 
buffer.Complete(); 
Console.ReadLine(); 

上記のコードは、静かに他のグループを処理して、第4グループから数字だけを書きますが、私はあなたのアイデアを得たいと考えています。少なくとも1つのコンシューマのブロックをリンクせずにメッセージをフィルタリングすることは、コンシューマによって受け入れられない場合にはドロップされません。また、デフォルトのハンドラがない場合にはこれを行うことができます(NullTarget<int>は、それが得たメッセージ):

buffer.LinkTo(DataflowBlock.NullTarget<int>()); 

これの欠点は、それの利点の続きです。このためのビルトイン構造が存在しないとして、あなたは、述語を提供する必要があります。しかし、それでもやり遂げることができます。

関連する問題