あなたの観測値は熱くありません。それは共有された情報源を用いた寒い観察であり、それはあたかも熱心に観察できるように後続の観察者を行動させるだけです。これはおそらく、暖かい観察可能性として最もよく説明されています。
例を見てみましょう:
var query = Observable.Range(0, 3).ObserveOn(Scheduler.Default).Publish().RefCount();
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("A"); });
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("B"); });
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("C"); });
Thread.Sleep(10000);
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("D"); });
Observable
.Range(0, 3)
.ObserveOn(Scheduler.Default)
.Publish()
.RefCount()
.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("E"); });
私はこれを実行すると、私が得る:
A
A
B
C
A
B
C
E
E
E
"B" & "C" オブザーバーは、シーケンスの最初の値を欠場。
"A"、 "B"、 "C"オブザーバが終了した後、シーケンスが終了するので、 "D"は決して値を取得しません。私は、値 "E"を表示するために、新しい観測値を作成しなければなりませんでした。
あなたのコードでは、最初のオブザーバーが2番目と3番目のサブスクライブ前に1つ以上の値を終了すると、そのオブザーバーは値を見逃します。それはあなたが欲しいものですか?
しかし、あなたの質問では、観測値から返された使い捨ての値を扱う方法について質問しています。 Observable.Using
を使用すると簡単です。
ここにあなたのコードに似たような状況ません:私はこのコードを実行する場合
public static IObservable<IDisposable> ImagesInFolder(IScheduler scheduler)
{
return
Observable
.Range(0, 3)
.ObserveOn(Scheduler.Default)
.SelectMany(x =>
Observable
.Using(
() => Disposable.Create(() => Console.WriteLine("Disposed!")),
y => Observable.Return(y)))
.Publish()
.RefCount();
}
は今:
var query = ImagesInFolder(Scheduler.Default);
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("A"); });
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("B"); });
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("C"); });
Thread.Sleep(10000);
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("D"); });
を、私はこの出力を得る:再び
A
B
C
Disposed!
A
B
C
Disposed!
A
B
C
Disposed!
"D" 決して任意の値を生成します。 "B" & "C"が値を欠いている可能性がありますが、これは返す方法を示しています自動的にオブザーバに配置される観察可能な値が終了する。
は、あなたのコードは次のようになります。
public static IObservable<System.Drawing.Bitmap> ImagesInFolder(string path, IScheduler scheduler)
{
return
Directory
.GetFiles(path, "*.bmp")
.ToObservable(scheduler)
.SelectMany(x =>
Observable
.Using(
() => new System.Drawing.Bitmap(x),
bm => Observable.Return(bm)))
.Publish()
.RefCount();
}
しかし、あなたはおそらく、欠損値の地ではまだです。
は、したがって、あなたが本当にこれを行う必要があります。
public static IConnectableObservable<System.Drawing.Bitmap> ImagesInFolder(string path, IScheduler scheduler)
{
return
Directory
.GetFiles(path, "*.bmp")
.ToObservable(scheduler)
.SelectMany(x =>
Observable
.Using(
() => new System.Drawing.Bitmap(x),
bm => Observable.Return(bm)))
.Publish();
}
を次に、あなたがこのようにそれを呼び出す:
public void Main()
{
var images = ImagesInFolder("c:\Users\VASIYA\Desktop\Sample Images", TaskPoolScheduler.Instance);
var process1 = images.Subscribe(SaveBwImages);
var process2 = images.Subscribe(SaveScaledImages);
var process3 = images.Select(Cats).Subscribe(SaveCatsImages);
images.Connect();
}
他のオプションは全体.Publish().RefCount()
コードをドロップすると、あなたはそれを正しく行うことを確認することですあなたが購読するときあなた自身。
このコードを試してみてください。
void Main()
{
ImagesInFolder(Scheduler.Default)
.Publish(iif =>
Observable
.Merge(
iif.Select(x => { Thread.Sleep(1000); Console.WriteLine("A"); return "A"; }),
iif.Select(x => { Thread.Sleep(3000); Console.WriteLine("B"); return "B"; }),
iif.Select(x => { Thread.Sleep(2000); Console.WriteLine("C"); return "C"; })))
.Subscribe();
}
public static IObservable<IDisposable> ImagesInFolder(IScheduler scheduler)
{
return
Observable
.Range(0, 3)
.ObserveOn(Scheduler.Default)
.SelectMany(x =>
Observable
.Using(
() => Disposable.Create(() => Console.WriteLine("Disposed!")),
y => Observable.Return(y)));
}
私はこれを取得:
再び
A
B
C
Disposed!
A
B
C
Disposed!
A
B
C
Disposed!
、各観察者が実行しているが、問題は、今、私は処理の遅延を変更したことである1 Disposed!
後しかし、まだ出力されているコードは、オブザーバーが追加された順序です。問題は、Rxが各オブザーバを順番に実行し、生成される各値が順番に並んでいることです。
.Publish()
を使用して並列処理が行われる可能性があります。あなたはそうしない。
これを並列実行する方法は、.Publish()
を完全に削除することです。私は今、この取得
void Main()
{
ImagesInFolder(Scheduler.Default).Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("A"); });
ImagesInFolder(Scheduler.Default).Subscribe(x => { Thread.Sleep(3000); Console.WriteLine("B"); });
ImagesInFolder(Scheduler.Default).Subscribe(x => { Thread.Sleep(2000); Console.WriteLine("C"); });
}
public static IObservable<IDisposable> ImagesInFolder(IScheduler scheduler)
{
return
Observable
.Range(0, 3)
.ObserveOn(Scheduler.Default)
.SelectMany(x =>
Observable
.Using(
() => Disposable.Create(() => Console.WriteLine("Disposed!")),
y => Observable.Return(y)));
}
:: - 、正しくIDisposable
ときの処分
A
Disposed!
C
Disposed!
A
Disposed!
B
Disposed!
A
Disposed!
C
Disposed!
C
Disposed!
B
Disposed!
B
Disposed!
コードは今、並列に実行され、できるだけ速く終わる
はただ、この種のものを行いますサブスクリプションは終了します。各観察者と1つの使い捨てリソースを共有するという喜びは得られませんが、行動上の問題のすべてを解決するわけでもありません。
omg、それは私が想像できる最高の答えです!どうもありがとうございました! – Dmitry