2011-06-30 20 views
7

これに関するいくつかの記事がありますが、これは機能していますが、Observableサブスクリプションの最大数のタスクスレッドを一度に設定する方法を知りたいと思います。リアクティブエクステンションを使用した非同期キュー処理

私は、ログエントリの非同期省並列化するために、次のしている:私の節約をスケジュールするには

private BlockingCollection<ILogEntry> logEntryQueue; 

logEntryQueue = new BlockingCollection<ILogEntry>(); 
logEntryQueue.GetConsumingEnumerable().ToObservable(Scheduler.TaskPool).Subscribe(SaveLogEntry); 

を...しかし、どのように私はへのスケジューラの最大スレッドを指定してください一度に使用しますか?

+0

最大同時スレッドの制限のみにスケジューラを作成する場合は、TPLデータフローを検討することを検討してください。これは、パイプライン内の各ブロックが並行性に対して異なる制限を持つパイプラインを作成するために特別に構築されました。私は両方の方法を試してみると、少なくとも分かりやすく保守しやすいものでした。 – yzorg

答えて

8

これはObservableの機能ではなく、Schedulerの機能です。 Observableはを定義し、とスケジューラはを定義します。です。

カスタムスケジューラを渡す必要があります。これを行う簡単な方法は、TaskSchedulerをサブクラス化し、 "MaximumConcurrencyLevel"プロパティをオーバーライドすることです。

http://msdn.microsoft.com/en-us/library/system.threading.tasks.taskscheduler.maximumconcurrencylevel.aspx

私は実際にはMSDNでこのサンプルが見つかりました:

http://msdn.microsoft.com/en-us/library/ee789351.aspx

編集:あなたはTaskSchedulerからISchedulerに行く方法についての質問を。あなたは(。すなわち、それらはに加入するまで何もしたい)繰延実行をIObservable<T>としてあなたの「仕事」を作成した場合

var ischedulerForRx = new TaskPoolScheduler 
(
    new TaskFactory 
    (
     //This is your custom scheduler 
     new LimitedConcurrencyLevelTaskScheduler(1) 
    ) 
); 
+1

素晴らしい、ありがとう...しかし、私は新しいTaskSchedulerのためのReactive ISchedulerをどうやって得るのですか?私は全体の機能を再実装したくありません。 – Jeff

2

、あなたが受け入れるMergeオーバーロードを使用することができます別の開発者は、単に情報の少しことを私に与えました

ISubject<QueueItem> synchronizedQueue = new Subject<QueueItem>().Synchronize(); 

queue 
    .Select(item => StartWork(item)) 
    .Merge(maxConcurrent: 5) // C# 4 syntax for illustrative purposes 
    .Subscribe(); 

// To enqueue: 
synchronizedQueue.OnNext(new QueueItem()); 
+0

これは、キューの処理中に他のスレッドをキューに追加する方法をどのように処理しますか?列挙中にコレクションが変更されることはありませんか? – Jeff

+0

'Merge'演算子はスレッドセーフですが、スレッドセーフである場合に備えて、その答えを更新しました。 –

+0

これは単なる私の混乱だと思います...それは私が懸念しているスレッドの安全性ではなく、むしろ列挙されているので、キューが変更されている(他のアイテムはエンキューされています)...これは問題ではありません? – Jeff

関連する問題