2016-05-02 13 views
2

私の目標は、1つのストリームから読み込み、そのストリームを変換し、それを読み取るためにStreamを受け入れるライブラリへの入力として使用することです。どのようにOutputStreamをC#でInputStreamとして渡しますか?

私は2つの異なるライブラリを使用しています。 1つは出力Streamを受け取り、変換します。それをTransformingOutputStreamとしましょう。その使用目的は次のとおりです。Streamを受け入れる別のライブラリを使用しています。それは必要なものを何でもして、そのストリームから読み込みます。その使用目的は次のとおりです。

MagicStreamReadingLibrary.ProcessStream(someInputStream); 

その使用目的が書かれ-に、読み取りからではないことがあるので、私はそれにTransformingOutputStreamを渡すことはできません。どちらのライブラリも制御できません。

は、どのように私はフックアップしますTransformingOutputStreamを入力Streamからを読んを必要とするライブラリ関数に?

答えて

2

は、これまでのところ、これは匿名のパイプを使用して、私が持っている最高の実施例である:

using(var pipeServer = new AnonymousPipeServerStream(PipeDirection.Out)) { 
    var pipeServerTask = Task.Run(
     async() => { 
      using(var stream = getInputStream()) { 
       await stream.CopyToAsync(new TransformingOutputStream(pipeServer)); 
      } 
      pipeServer.WaitForPipeDrain(); 
      pipeServer.Dispose(); 
     }); 

    using(var client = new AnonymousPipeClientStream(PipeDirection.In, pipeServer.ClientSafePipeHandle)) { 
     MagicStreamReadingLibrary.ProcessStream(client); 
    } 
    pipeServerTask.Wait(); 
} 
+1

ああ、それは私の考えよりずっと簡単です。私はそれに行くだろう。 –

1

フラットファイルに書き込んでから読み返してください。

+1

あなたが技術的に間違っていませんよ!しかし、ストリーミングできるようにできるかどうかを見てみましょう。そのストリームがどれくらいの間、誰が知っているのですか? –

0

ここに私が一緒に投げたものがあります。理論的には(テストされていない、正しくコンパイルされていることがわかります)それはあなたのユースケースに働くだろう

public class BufferingStream 
{ 
    private readonly Stream _readingStream; 
    private readonly Stream _writingStream; 
    private BlockingCollection<byte[]> _buffer; 

    public BufferingStream() 
    { 
     _buffer = new BlockingCollection<byte[]>(new ConcurrentQueue<byte[]>()); 
     _readingStream = new InternalReadingStream(_buffer); 
     _writingStream = new InternalWritingStream(_buffer); 
    } 

    public BufferingStream(int maxQueueLength) 
    { 
     _buffer = new BlockingCollection<byte[]>(new ConcurrentQueue<byte[]>(), maxQueueLength); 
     _readingStream = new InternalReadingStream(_buffer); 
     _writingStream = new InternalWritingStream(_buffer); 
    } 
    public Stream GetReadingStream() 
    { 
     return _readingStream; 
    } 

    public Stream GetWritingStream() 
    { 
     return _writingStream; 
    } 

    public int QueueLength 
    { 
     get { return _buffer.Count; } 
    } 

    public class InternalWritingStream : Stream 
    { 
     private readonly BlockingCollection<byte[]> _queuedBytes; 

     public InternalWritingStream(BlockingCollection<byte[]> queuedBytes) 
     { 
      _queuedBytes = queuedBytes; 
     } 

     public override void Write(byte[] buffer, int offset, int count) 
     { 
      byte[] internalBuffer = new byte[count]; 
      Array.Copy(buffer, offset, internalBuffer, 0, count); 
      _queuedBytes.Add(internalBuffer); 
     } 

     public override void Close() 
     { 
      _queuedBytes.CompleteAdding(); 
      base.Close(); 
     } 

     public override void Flush() 
     { 
     } 

     public override long Seek(long offset, SeekOrigin origin) 
     { 
      throw new NotSupportedException(); 
     } 

     public override void SetLength(long value) 
     { 
      throw new NotSupportedException(); 
     } 

     public override int Read(byte[] buffer, int offset, int count) 
     { 
      throw new NotSupportedException(); 
     } 

     public override bool CanRead 
     { 
      get { return false; } 
     } 

     public override bool CanSeek 
     { 
      get { return false; } 
     } 

     public override bool CanWrite 
     { 
      get { return true; } 
     } 

     public override long Length 
     { 
      get { throw new NotSupportedException(); } 
     } 

     public override long Position 
     { 
      get { throw new NotSupportedException(); } 
      set { throw new NotSupportedException(); } 
     } 
    } 

    private sealed class InternalReadingStream : Stream 
    { 
     private readonly BlockingCollection<byte[]> _queuedBytes; 
     private byte[] _currentItem; 
     private int _currentItemOffset; 

     public InternalReadingStream(BlockingCollection<byte[]> queuedBytes) 
     { 
      _queuedBytes = queuedBytes; 
      _currentItem = new byte[0]; 
      _currentItemOffset = 0; 
     } 

     public override int Read(byte[] buffer, int offset, int count) 
     { 
      if (_currentItemOffset == _currentItem.Length) 
      { 
       //Try to take the next buffer, if we can't take a item it means we where done adding from the source. 
       var taken = _queuedBytes.TryTake(out _currentItem, Timeout.Infinite); 
       if (!taken) 
        return 0; 

       _currentItemOffset = 0; 
      } 
      var bytesToRead = Math.Min(count, _currentItem.Length - _currentItemOffset); 
      Array.Copy(_currentItem, _currentItemOffset, buffer, offset, bytesToRead); 
      _currentItemOffset += bytesToRead; 

      return bytesToRead; 
     } 

     public override void Flush() 
     { 
     } 

     public override long Seek(long offset, SeekOrigin origin) 
     { 
      throw new NotSupportedException(); 
     } 

     public override void SetLength(long value) 
     { 
      throw new NotSupportedException(); 
     } 

     public override void Write(byte[] buffer, int offset, int count) 
     { 
      throw new NotSupportedException(); 
     } 

     public override bool CanRead 
     { 
      get { return true; } 
     } 

     public override bool CanSeek 
     { 
      get { return false; } 
     } 

     public override bool CanWrite 
     { 
      get { return false; } 
     } 

     public override long Length 
     { 
      get { throw new NotSupportedException(); } 
     } 

     public override long Position 
     { 
      get { throw new NotSupportedException(); } 
      set { throw new NotSupportedException(); } 
     } 
    } 
} 

方法は

var bufferingStream = new BufferingStream(); 

Task.Run(() => 
{ 
    using(var inputStream = GetTheStreamFromSomewhere(); 
    using(var finalDestinationOutputStream = bufferingStream.GetWritingStream()) 
    using(var outputStream = new TransformingOutputStream(finalDestinationOutputStream)) 
    { 
     inputStream.CopyTo(outputStream); 
    } 
} 

using(var someInputStream = bufferingStream.GetReadingStream()) //Technically a using is not necessary on the reading stream but it is good to keep good habits. 
{ 
    MagicStreamReadingLibrary.ProcessStream(someInputStream); 
} 

だろう。最初のデータが利用可能になるまでProcessStreamますがブロックされます.Read(呼び出しに呼び出します。バイトが利用可能になると、.Read(はデータをブロック解除して渡します。 finalDestinationOutputStreamが削除されると、完了したとしてキューがマークされます。outputStreamは最後の読み取りを終了すると、それ以降の呼び出しでは0が返されます。

あなたの作家があなたのリーダーよりもはるかに高速であることがわかったら、読者が読む機会があるまで書き込みがブロックされるように最大キュー長を渡すことができます。

関連する問題