2017-11-06 4 views
2

私はDataFlowExを使用していますが、例外がスローされた場合にDataFlow全体をシャットダウンすることはできません。1つのブロックに障害が発生したときに、データフローネットワーク全体をシャットダウンすることを避けてください。

タスクがランダムに発生するシステムがあります。ネットワークに障害を記録し、その特定のタスクを放棄して他のタスクの実行を続行したいと考えています。 TPLとDataFlowEx両方のドキュメントを読んで

、特に

It [a faulted block] should decline any further incoming messages. Here

DataflowEx takes a fast-fail approach on exception handling just like TPL Dataflow. When an exception is thrown, the low-level block ends to the Faulted state first. Then the Dataflow instance who is the parent of the failing block gets notified. It will immediately propagate the fatal error: notify its other children to shutdown immediately. After all its children is done/completed, the parent Dataflow also comes to its completion, with the original exception wrapped in the CompletionTask whose status is also Faulted. Here

のようなものは、それはほとんどの障害からの移動ブロックのように思えるが意図されていない...

私のフローがたくさん含まファイルIOと私はときどき例外が発生することを期待しています(読み取り/書き込み中にネットワークボリュームがオフラインになる、接続の失敗、権限の問題など...)

パイプライン全体が枯渇しないようにしたいと思います。ここで

は、私が働いているコードの例です。ここで

using Gridsum.DataflowEx; 
using System; 
using System.IO; 
using System.Threading.Tasks.Dataflow; 

namespace DataManagementSystem.Data.Pipeline.Actions 
{ 
    class CopyFlow : Dataflow<FileInfo, FileInfo> 
    { 
     private TransformBlock<FileInfo, FileInfo> Copier; 
     private string destination; 

     public CopyFlow(string destination) : base(DataflowOptions.Default) 
     { 
      this.destination = destination; 

      Copier = new TransformBlock<FileInfo, FileInfo>(f => Copy(f)); 

      RegisterChild(Copier);    
     } 

     public override ITargetBlock<FileInfo> InputBlock { get { return Copier; } } 

     public override ISourceBlock<FileInfo> OutputBlock { get { return Copier; } } 

     protected virtual FileInfo Copy(FileInfo file) 
     { 
      try 
      { 
       return file.CopyTo(Path.Combine(destination, file.Name)); 
      } 
      catch(Exception ex) 
      { 
       //Log the exception 
       //Abandon this unit of work 
       //resume processing subsequent units of work 
      } 

     } 
    } 
} 

は、私がパイプラインに作業を送ってる方法です:

var result = pipeline.ProcessAsync(new[] { file1, file2 }).Result; 

答えて

1

それならばブロックが故障となりますExceptionがスローされます。パイプラインを失敗させたくない場合は、完了を伝播したり、Exceptionを処理したりすることはできません。例外の処理には多くの形がありますが、必要なのは簡単な再試行であると思われます。 try/catchを使用し、独自の再試行ループを実装するか、Pollyのようなものを使用することができます。簡単な例を以下に示します。

注:このコードはテストされていませんが、正しい方向に進むはずです。

編集

あなたはほとんどあなたが必要なすべてを持っています。例外をキャッチしてログに記録すると、nullまたはパイプラインからフィルタリングしてNullTargetにフィルタすることができる他のマーカーを返すことができます。このコードではNullTargetフィルタリングリンクがCopierの最初のリンクであることが保証されています。したがって、nullは実際の宛先にはなりません。

class CopyFlow : Dataflow<FileInfo, FileInfo> { 
    private TransformBlock<FileInfo, FileInfo> Copier; 
    private string destination; 

    public CopyFlow(string destination) : base(DataflowOptions.Default) { 
     this.destination = destination; 

     Copier = new TransformBlock<FileInfo, FileInfo>(f => Copy(f)); 
     Copier.LinkTo(DataflowBlock.NullTarget<FileInfo>(), info => info == null); 

     RegisterChild(Copier); 
    } 

    public override ITargetBlock<FileInfo> InputBlock { get { return Copier; } } 

    public override ISourceBlock<FileInfo> OutputBlock { get { return Copier; } } 

    protected virtual FileInfo Copy(FileInfo file) { 
     try { 
      return file.CopyTo(Path.Combine(destination, file.Name)); 
     } catch(Exception ex) { 
      //Log the exception 
      //Abandon this unit of work 
      //resume processing subsequent units of work 
      return null; 
     } 

    } 
} 
+0

お返事ありがとうございます。あなたの言うことは興味深いものです。特に「例外を処理する」ことについてです。再試行ではなく、失敗を記録してその作業単位を完全に停止したかったのです。具体的には、私の問題は、実際に*仕事をしている関数の内部で、例外を捕捉する方法 - >エラーを記録する - >その作業単位を放棄する - >何も起こらないかのように続ける方法を理解できないことです。問題は関数呼び出し元が、関数が返ってから 'FileInfo'が復帰することを期待していることです。 –

+0

上記の編集を参照してください –

+0

私は今参照してください。私の混乱は、「NullTarget」の使用を完全に理解していないことから私からのものでした。私はそれを読み上げます、ありがとう! P.S.ポリは絶対に華麗に見える!私はすぐに、私が必要としていたものをすぐに使い始めるつもりです。 –

関連する問題