2016-11-25 5 views
0

ストリームの流れがあります。"checksum"を一連の.zipファイルで計算します。Observable。破損したデータを取得しているストリームを使用しています。

はそれを行うには、私が観察できるように設定している :

  1. は、各ファイルの内容を読み込み、指定したフォルダ内のすべてのファイルを受け取り、各エントリの
  2. ZipArchiveとして読んで)各ファイルには、

はそれを説明するためにチェックサムの計算を行い、私はこの例を作成しました十分な:

NOTICE Main方法を作るためにAsyncContext.Runhttps://stackoverflow.com/a/9212343/1025407)の使用量がGetChecksumを待って、それはアプリケーションを実行するコンソールアプリケーション

namespace DisposePoC 
{ 
    using System.Collections.Generic; 
    using System.IO; 
    using System.IO.Compression; 
    using System.Reactive.Linq; 
    using Nito.AsyncEx; 
    using System.Linq; 
    using System.Threading.Tasks; 


    class Program 
    { 
     private static void Main() 
     { 
      AsyncContext.Run(GetChecksums); 
     } 

     private static async Task<IList<byte>> GetChecksums() 
     { 
      var bytes = Directory.EnumerateFiles("FolderWithZips") 
       .ToObservable() 
       .SelectMany(path => Observable.Using(() => CreateZipArchive(path), archive => archive.Entries.ToObservable())) 
       .SelectMany(entry => Observable.Using(entry.Open, stream => Observable.FromAsync(() => CalculateChecksum(stream, entry.Length)))); 

      return await bytes.ToList(); 
     } 

     private static ZipArchive CreateZipArchive(string path) 
     { 
      return new ZipArchive(new FileStream(path, FileMode.Open, FileAccess.Read)); 
     } 

     private static async Task<byte> CalculateChecksum(Stream stream, long entryLength) 
     { 
      var bytes = await GetBytesFromStream(stream, entryLength); 
      return bytes.Aggregate((b1, b2) => (byte) (b1^b2)); 
     } 

     private static async Task<byte[]> GetBytesFromStream(Stream stream, long entryLength) 
     { 
      byte[] bytes = new byte[entryLength]; 
      await stream.ReadAsync(bytes, 0, (int)entryLength); 
      return bytes;    
     } 
    } 
} 

なので、私はエラーのすべての種類を取得:

'System.IO.InvalidDataException':ローカルファイルヘッダーが破損しています。 'System.NotSupportedException':ストリームは読み取りをサポートしていません。 'System.ObjectDisposedException':配置されたオブジェクトにアクセスできません。 'System.IO.InvalidDataException':ブロックの長さがその補数と一致しません。

私は間違っていますか?

observable自体に問題はありますか?ZipArchiveはスレッドセーフではないためですか?そうでない場合は、コードをどのように動作させるのですか?

+1

私は、現時点では中・コードを確認することができないので、私はこのコメント作ってあげるが、私は、問題が最初にSelectManyで作成されてZipArchivesは、Usingステートメントによって処分されていることである疑いがあるでしょう次の行の入力ストリームを読む前に、本質的に、使い捨てのスコープは間違っています。私は2番目のSelectManyから最初のロジックにロジックを移動します。最初の例外が示すように、テストデータが破損していないことを確認します。 – Andrew

+0

私はあなたの意見を見ると思います。しかし、範囲が間違っている場合、すべてのエントリーが処理されるまで各ZipArchiveの処分を避けるためにコードを修正する必要がありますか?それも可能ですか? – SuperJMN

答えて

1

あなたの問題について「Rx」は何もないようです。あなたは、ループの不可欠セットに全体のことを国防省場合

それはだから私はあなたが競合条件(同時性)および/または注文の処分の問題のうちのセットを持っている想像

private static async Task<IList<byte>> GetChecksums() 
{ 
    var bytes = new List<byte>(); 
    foreach (var path in Directory.EnumerateFiles("FolderWithZips")) 
    { 
     using (var archive = CreateZipArchive(path)) 
     { 
      foreach (var entry in archive.Entries) 
      { 
       using (var stream = entry.Open()) 
       { 
        var checksum = await CalculateChecksum(stream, entry.Length); 
        bytes.Add(checksum); 
       } 
      } 
     } 
    } 

    return bytes; 
} 

正常に動作します。

+0

Observable.Usingはストリームの処理を正しい順序で処理すると考えていたので、ObjectDisposedExceptionsを取得しません。間違って使っているのですか、それとも本質的に問題の性質に関係している問題ですか? (ZipArchiveから同時に読み取る) – SuperJMN

+1

Observable.Usingは、シーケンスが終了すると(dispose/error/complete)、提供されたファクトリによって作成されたリソースを破棄します。しかし、あなたはRxをReactiveではない問題に強制しているので、それはすべて学問的なものです。それは2つの(不必要な)ToObservable()呼び出しにISchedulerを提供しないことで、スレッドの不要な導入という周辺問題が発生することが主な問題です。 –

2

Rxはおそらくこれに最適ではありません。正直なところ、非同期でなくても実行できます。

Directory.EnumerateFiles("FolderWithZips") 
     .AsParallel() 
     .Select(folder => CalculateChecksum(folder)) 
     .ToList() 
+0

まあ、CalculateChecksumは単なる質問の例です。私の実際の人生の問題では、私が変更できない非同期メソッド(サードパーティ)です。どのようにあなたのアプローチを変更するのですか? (非同期) – SuperJMN

関連する問題