2016-12-01 6 views
1

可観測ストリームからのメッセージを処理するときに、非同期IO(WriteAsyn、SaveAsyncなど)を使用するにはどうすればよいですか?Reactive Extensions(Rx)を使用している場合にObservableを処理するためのAsync IOの使用

私は現在、クライアントソケットから読み取ったメッセージを観測可能なものとしてストリーミングするためにRxを使用しています。単純化されたバージョンは、拡張メソッドを使用して、次のようになります。私のリポジトリはIO非同期使用を強調するのみTPL非同期非ブロッキングメソッドを持っているので、今

var messagesFromClient = socket.ReadChunksAsObservable() 
    .ScanChunksIntoFrames() 
    .MapFrameToString() 
    .DoLogString() 
    .FilterEmptyStrings() 
    .MapToMessageObject(); 

が、私は、私のリポジトリにこのメッセージを保存する方法については、苦労しています(たとえば、 await dbConnection.SaveAsync(...)

私が読んでテストしたものから、それだけのような何かをすることはできません。

私はIO操作非同期それらを使用することができます
messagesFromClient.Subscribe(async message=>{ 
    await myRepo.SaveAsync(); 
}); 

を?私はそれらを副作用として扱うべきですか?どんな例?

答えて

1

あなたはObservable.FromAsyncを使用してasync方法からObservableを作成することができ、その後、この観測可能にmessagesFromClientをflatmap:

messagesFromClient 
    .SelectMany(message => Observable.FromAsync(() => myRepo.SaveAsync(message))) 
    .Subscribe(); 
+1

それは理にかなっています。しかし、SelectManyはメッセージを同時に処理することはできませんか?たぶん、マージ()がそれをするだろう!ありがとう、私はそれを試してみます。 –

+0

@FilipeBorges: 'SelectMany'は、各要素をobservable(この場合はasync関数を呼び出すことによって)に投影し、これらのobservablesを' merge 'するので、メッセージは同時に処理されます( 'SaveAsync'が実際に並行性を導入する場合)。 –

関連する問題