2013-01-22 10 views
5

私はここで苦労しています。通常、私は本を読んだが、まだ何もない。私は、RXを使ってストリームを読むこととはさまざまなことの無数の例を見出しましたが、私は頭を悩ますのが非常に難しいと感じています。最後にストリームを読み込むObservableを作成する適切な方法は何ですか

Observable.FromAsyncPatternを使用して、ストリームのBeginRead/EndReadメソッドまたはBeginReadLine/EndReadLineメソッドのラッパーを作成することができます。

しかし、これは最初の観察者が購読したときに一度だけ読み込みます。

ストリームのエラーまたは終了までOnNextを読み込み、ポンピングするObservableが必要です。

これに加えて、私はどのようにして複数のサブスクライバと観測値を共有してアイテムを取得できるのかを知りたいと思います。

+3

偉大な記事ですが、大量に古いです。これは古いAPIを参照しています。この章では、これらの概念を取り上げ、より新しいバージョンのRxで動作させるようにしています。 * http://introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#CreatingYourOwnIterator –

+0

ああ、そのリンクについて知ったことはありません@LeeCampbell - とてもいいです! – JerKimball

+0

この質問には何が起こったのですか?以前の答えはどこにありますか? –

答えて

1

ソリューションは、ここで

をObservable.Create使用することです(接続を呼び出すことを忘れないでくださいストリームの任意の種類

public static IConnectableObservable<Command> GetReadObservable(this CommandReader reader) 
    { 

     return Observable.Create<Command>(async (subject, token) => 
     { 


      try 
      { 

       while (true) 
       { 

        if (token.IsCancellationRequested) 
        { 
         subject.OnCompleted(); 
         return; 
        } 

        //this part here can be changed to something like this 
        //int received = await Task.Factory.FromAsync<int>(innerSocket.BeginReceive(data, offset, size, SocketFlags.None, null, null), innerSocket.EndReceive); 

        Command cmd = await reader.ReadCommandAsync(); 

        subject.OnNext(cmd); 

       } 

      } 

      catch (Exception ex) 
      { 
       try 
       { 
        subject.OnError(ex); 
       } 
       catch (Exception) 
       { 
        Debug.WriteLine("An exception was thrown while trying to call OnError on the observable subject -- means you're not catching exceptions everywhere"); 
        throw; 
       } 
      } 

     }).Publish(); 

    } 

を読み取るためadapatedことができる一例です)返さIConnectableObservable

3

rxxを使用して、李氏の答えに追加:

using (new FileStream(@"filename.txt", FileMode.Open) 
     .ReadToEndObservable() 
     .Subscribe(x => Console.WriteLine(x.Length))) 
{ 
    Console.ReadKey(); 
} 

リードバッファの長さが出力されます。

+0

Leeの答えはどこですか? –

+0

何らかの理由で消えてしまった –

+0

実際に質問への回答がなかったため、彼の答えは削除されました。[彼の]本への単なるリンク –

1

へぇ - つもりリユースここに私の他の回答(それのほか、一部、とにかく)の1:

参考:

:それは Reading from NetworkStream corrupts the buffer

が、私はこのような拡張メソッドを持っています

public static class Ext 
{   
    public static IObservable<byte[]> ReadObservable(this Stream stream, int bufferSize) 
    {   
     // to hold read data 
     var buffer = new byte[bufferSize]; 
     // Step 1: async signature => observable factory 
     var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(
      stream.BeginRead, 
      stream.EndRead); 
     return Observable.While(
      // while there is data to be read 
      () => stream.CanRead, 
      // iteratively invoke the observable factory, which will 
      // "recreate" it such that it will start from the current 
      // stream position - hence "0" for offset 
      Observable.Defer(() => asyncRead(buffer, 0, bufferSize)) 
       .Select(readBytes => buffer.Take(readBytes).ToArray())); 
    } 
} 

おそらく、そうのような形式で記述されたとして、これを使用することができます。

// Note: ToEnumerable works here because your filestream 
// has a finite length - don't do this with infinite streams! 
var blobboData = stream 
    .ReadObservable(bufferSize) 
    // take while we're still reading data 
    .TakeWhile(returnBuffer => returnBuffer.Length > 0) 
    .ToEnumerable() 
    // mash them all together 
    .SelectMany(buffer => buffer) 
    .ToArray(); 
+0

こんにちはJerKimball、あなたの解決に感謝します。私はそれを使用して、その後、エラー条件を処理するのが難しいです。あなたは接続を閉じる必要があるときにどのように対処しますか?私はあなたがそれをチェックアウトすることができますかについて質問http://stackoverflow.com/questions/32079292/reading-from-stream-using-observable-through-fromasyncpattern-how-to-close-cancを開いた。 –

+0

Rxxはそれとはまったく異なっているようです。誰かが詳しく説明できますか? https://github.com/RxDave/Rxx/blob/master/Main/Source/Rxx.Bindings-Net451/System/IO/StreamExtensions.cs –

+0

この実装には問題があります。異なる加入者は、ストリームデータの異なるシェアを得るでしょう!すべての加入者がすべてのデータを取得できるように、 '.Publish()。Connect()'を追加する必要があります。 –

1

上のあなた複数のリーダー間の共有を制御するために、ストリームの終わりまでラインを読み取るためにRepeatを使用し、PublishまたはReplayを使用することができます。

端までの任意のストリームから行を読み取るための簡単な、完全な受信溶液の例は、次のようになります

public static IObservable<string> ReadLines(Stream stream) 
{ 
    return Observable.Using(
     () => new StreamReader(stream), 
     reader => Observable.FromAsync(reader.ReadLineAsync) 
          .Repeat() 
          .TakeWhile(line => line != null)); 
} 

この解決策はまた、ReadLine戻りnullの場合、ストリームの終了事実を利用しますが達成された。

関連する問題