2012-02-24 6 views
2

現在、ネットワークストリームをリッスンし、新しいメッセージがデシリアライズされたときにイベントを発生させるプログラムがあります。System.Reactiveを使用してメッセージをデシリアライズする

while(true) 
{ 
    byte[] lengthBytes = new byte[10]; 
    networkStream.Read(lengthBytes, 0, 10); 
    int messageLength = Int32.Parse(Encoding.UTF8.GetString(lengthBytes)); 
    var messageBytes = new byte[messageLength + 10]; 
    Array.Copy(lengthBytes, messageBytes, 10); 
    int bytesReadTotal = 10; 
    while (bytesReadTotal < 10 + messageLength) 
    bytesReadTotal += networkStream.Read(messageBytes, bytesReadTotal, messageLength - bytesReadTotal + 10); 
    OnNewMessage(new MessageEventArgs(messageFactory.GetMessage(messageBytes))); 
} 

私の代わりにイベントのIObservable<Message>があるように、反応性の拡張機能を使用して、これを書き直したいです。これは、

を使用して行うことができますが、代わりにSystem.Reactiveを使用してリスニングプロセスを書き直すことをお勧めします。 (hereから)私の出発点は、私も継続する方法を確認するために苦労してい

byte[] lengthBytes = new byte[10]; 
read(lengthBytes, 0, lengthBytes.Length).Subscribe(
{ 
    (bytesRead) => ; 
}); 

ことができます

Func<byte[], int, int, IObservable<int>> read; 
read = Observable.FromAsyncPattern<byte[], int, int, int>(
networkStream.BeginRead, 
networkStream.EndRead); 

です。誰にも実装がありますか?

答えて

1

私は次のことを考え出しましたが、クラスを作成せずにメッセージをオブジェクトパケットにヘッダパケットを投射するなどしてSubject<T>を使用しなくても可能であると感じましたが、問題はEndRead()ですバイト配列は返されませんが、読み込まれるバイト数は返されません。したがって、ある時点でオブジェクトまたは少なくとも閉包が必要です)。

class Message 
{ 
    public string Text { get; set; } 
} 

class MessageStream : IObservable<Message> 
{ 
    private readonly Subject<Message> messages = new Subject<Message>(); 

    public void Start() 
    { 
     // Get your real network stream here. 
     var stream = Console.OpenStandardInput(); 
     GetNextMessage(stream); 
    } 

    private void GetNextMessage(Stream stream) 
    { 
     var header = new byte[10]; 
     var read = Observable.FromAsyncPattern<byte [], int, int, int>(stream.BeginRead, stream.EndRead); 
     read(header, 0, 10).Subscribe(b => 
     { 
      var bodyLength = BitConverter.ToInt32(header, 0); 
      var body = new byte[bodyLength]; 
      read(body, 0, bodyLength).Subscribe(b2 => 
      { 
       var message = new Message() {Text = Encoding.UTF8.GetString(body)}; 
       messages.OnNext(message); 
       GetNextMessage(stream); 
      }); 
     }); 
    } 

    public IDisposable Subscribe(IObserver<Message> observer) 
    { 
     return messages.Subscribe(observer); 
    } 
} 
0

Observable.FromAsyncPatternは一度だけ非同期呼び出しを行っているので、あなたの代わりにそれを複数回呼び出す機能を行う必要があります。これはあなたを始めさせるはずですが、おそらく改善の余地があります。同じ引数を使用して非同期呼び出しを繰り返し行うことができると仮定し、selectorがこれに起因する問題を処理することを前提としています。

Function FromRepeatedAsyncPattern(Of T1, T2, T3, TCallResult, TResult)(
      begin As Func(Of T1, T2, T3, AsyncCallback, Object, IAsyncResult), 
      [end] As Func(Of IAsyncResult, TCallResult), 
      selector As Func(Of TCallResult, TResult), 
      isComplete As Func(Of TCallResult, Boolean) 
      ) As Func(Of T1, T2, T3, IObservable(Of TResult)) 
    Return Function(a1, a2, a3) Observable.Create(Of TResult)(
     Function(obs) 
      Dim serial As New SerialDisposable() 
      Dim fac = Observable.FromAsyncPattern(begin, [end]) 
      Dim onNext As Action(Of TCallResult) = Nothing 
      'this function will restart the subscription and will be 
      'called every time a value is found 
      Dim subscribe As Func(Of IDisposable) = 
       Function() 
        'note that we are REUSING the arguments, the 
        'selector should handle this appropriately 
        Return fac(a1, a2, a3).Subscribe(onNext, 
                Sub(ex) 
                 obs.OnError(ex) 
                 serial.Dispose() 
                End Sub) 
       End Function 
      'set up the OnNext handler to restart the observer 
      'every time it completes 
      onNext = Sub(v) 
         obs.OnNext(selector(v)) 
         'subscriber disposed, do not check for completion 
         'or resubscribe 
         If serial.IsDisposed Then Exit Sub 
         If isComplete(v) Then 
          obs.OnCompleted() 
          serial.Dispose() 
         Else 
          'using the scheduler lets the OnNext complete before 
          'making the next async call. 
          'you could parameterize the scheduler, but it may not be 
          'helpful, and it won't work if Immediate is passed. 
          Scheduler.CurrentThread.Schedule(Sub() serial.Disposable = subscribe()) 
         End If 
        End Sub 
      'start the first subscription 
      serial.Disposable = subscribe() 
      Return serial 
     End Function) 
End Function 

ここから、IObservable(Of Byte)そうのように取得することができます。

Dim buffer(4096 - 1) As Byte 
Dim obsFac = FromRepeatedAsyncPattern(Of Byte(), Integer, Integer, Integer, Byte())(
       AddressOf stream.BeginRead, AddressOf stream.EndRead, 
       Function(numRead) 
        If numRead < 0 Then Throw New ArgumentException("Invalid number read") 
        Console.WriteLine("Position after read: " & stream.Position.ToString()) 
        Dim ret(numRead - 1) As Byte 
        Array.Copy(buffer, ret, numRead) 
        Return ret 
       End Function, 
       Function(numRead) numRead <= 0) 
'this will be an observable of the chunk size you specify 
Dim obs = obsFac(buffer, 0, buffer.Length) 

はそこから、あなたは彼らが発見されたときにバイト配列出力の完全なメッセージを受け取り、アキュムレータ機能のいくつかの並べ替えが必要になります。このような関数のスケルトンは次のようになります。

Public Function Accumulate(source As IObservable(Of Byte())) As IObservable(Of Message) 
    Return Observable.Create(Of message)(
     Function(obs) 
      Dim accumulator As New List(Of Byte) 
      Return source.Subscribe(
       Sub(buffer) 
        'do some logic to build a packet here 
        accumulator.AddRange(buffer) 
        If True Then 
         obs.OnNext(New message()) 
         'reset accumulator 
        End If 
       End Sub, 
       AddressOf obs.OnError, 
       AddressOf obs.OnCompleted) 
     End Function) 
End Function 
関連する問題