Observable.FromAsyncPattern
は一度だけ非同期呼び出しを行っているので、あなたの代わりにそれを複数回呼び出す機能を行う必要があります。これはあなたを始めさせるはずですが、おそらく改善の余地があります。同じ引数を使用して非同期呼び出しを繰り返し行うことができると仮定し、selector
がこれに起因する問題を処理することを前提としています。
Function FromRepeatedAsyncPattern(Of T1, T2, T3, TCallResult, TResult)(
begin As Func(Of T1, T2, T3, AsyncCallback, Object, IAsyncResult),
[end] As Func(Of IAsyncResult, TCallResult),
selector As Func(Of TCallResult, TResult),
isComplete As Func(Of TCallResult, Boolean)
) As Func(Of T1, T2, T3, IObservable(Of TResult))
Return Function(a1, a2, a3) Observable.Create(Of TResult)(
Function(obs)
Dim serial As New SerialDisposable()
Dim fac = Observable.FromAsyncPattern(begin, [end])
Dim onNext As Action(Of TCallResult) = Nothing
'this function will restart the subscription and will be
'called every time a value is found
Dim subscribe As Func(Of IDisposable) =
Function()
'note that we are REUSING the arguments, the
'selector should handle this appropriately
Return fac(a1, a2, a3).Subscribe(onNext,
Sub(ex)
obs.OnError(ex)
serial.Dispose()
End Sub)
End Function
'set up the OnNext handler to restart the observer
'every time it completes
onNext = Sub(v)
obs.OnNext(selector(v))
'subscriber disposed, do not check for completion
'or resubscribe
If serial.IsDisposed Then Exit Sub
If isComplete(v) Then
obs.OnCompleted()
serial.Dispose()
Else
'using the scheduler lets the OnNext complete before
'making the next async call.
'you could parameterize the scheduler, but it may not be
'helpful, and it won't work if Immediate is passed.
Scheduler.CurrentThread.Schedule(Sub() serial.Disposable = subscribe())
End If
End Sub
'start the first subscription
serial.Disposable = subscribe()
Return serial
End Function)
End Function
ここから、IObservable(Of Byte)
そうのように取得することができます。
Dim buffer(4096 - 1) As Byte
Dim obsFac = FromRepeatedAsyncPattern(Of Byte(), Integer, Integer, Integer, Byte())(
AddressOf stream.BeginRead, AddressOf stream.EndRead,
Function(numRead)
If numRead < 0 Then Throw New ArgumentException("Invalid number read")
Console.WriteLine("Position after read: " & stream.Position.ToString())
Dim ret(numRead - 1) As Byte
Array.Copy(buffer, ret, numRead)
Return ret
End Function,
Function(numRead) numRead <= 0)
'this will be an observable of the chunk size you specify
Dim obs = obsFac(buffer, 0, buffer.Length)
はそこから、あなたは彼らが発見されたときにバイト配列出力の完全なメッセージを受け取り、アキュムレータ機能のいくつかの並べ替えが必要になります。このような関数のスケルトンは次のようになります。
Public Function Accumulate(source As IObservable(Of Byte())) As IObservable(Of Message)
Return Observable.Create(Of message)(
Function(obs)
Dim accumulator As New List(Of Byte)
Return source.Subscribe(
Sub(buffer)
'do some logic to build a packet here
accumulator.AddRange(buffer)
If True Then
obs.OnNext(New message())
'reset accumulator
End If
End Sub,
AddressOf obs.OnError,
AddressOf obs.OnCompleted)
End Function)
End Function