2017-01-23 5 views
1

私はFuture.rsを使用して、別のプロセスでいくつかのタスクを管理しようとしています。私は、それぞれの未来を待つ方法と、それを次々に処理する方法を見ていますが、その状態を知るために実行中の未来をポーリングすることはできませんでした。私はいつもエラーがあります:待つことなく未来の状態をポーリングする方法?

thread 'main' panicked at 'no Task is currently running'

私はいつもそれが完了するまで何かしたいです。おそらく私はそれを正しい方法で使っていないでしょうか?私はそれをチャンネルを使って動作させることができましたが、未来をポーリングして準備ができたら結果を得ることができるはずです。私はそれをテストするために使用 コードは次のとおりです。

fn main() { 
    println!("test future"); 
    let thread_pool = CpuPool::new(4); 
    let mut future_execution_list = vec![]; 
    let mutex = Arc::new(AtomicUsize::new(0)); 
    //create the future to process 
    for _ in 0..10 { 
     let send_mutex = mutex.clone(); 
     let future = thread_pool.spawn_fn(move || { 
      //Simulate long processing 
      thread::sleep(time::Duration::from_millis(10)); 
      let num = send_mutex.load(Ordering::Relaxed); 
      send_mutex.store(num + 1, Ordering::Relaxed); 
      let res: Result<usize,()> = Ok(num); 
      res 

     }); 
     future_execution_list.push(future); 
    } 
    // do the job 
    loop { 
     for future in &mut future_execution_list { 
      match future.poll() { 
       Ok(Async::NotReady) =>(), //do nothing 
       Ok(Async::Ready(num)) => { 
        //update task status 
        println!(" future {:?}", num); 
       } 
       Err(_) => { 
        //log error and set task status to err 
        () 
       } 
      }; 
     } 
     //do something else 
    } 
} 

だから私はShepmasterの答えの後に私の質問を完了します。あなたの発言は非常に興味深いですが、私はまだ私の問題にsolutinを見つけることができません。私は自分の問題に関するいくつかの情報を追加します。私は一度にいくつかのタスクを管理できるオートメーション上でタスクをスケジュールしたい。イベントが管理され、タスクスケジューリングが計算されるループがあります。タスクがスケジュールされると、タスクが生成されます。タスクが終了すると、新しいスケジューリングが実行されます。タスクの実行中は、イベントが管理されます。 speudoのコードは次のようになります

loop { 
    event.try_recv() { ...} //manage user command for exemple 
    if (schedule) { 
     let tasks_to_spawn = schedule_task(); 
     let futures = tasks_to_spawn.map(|task| { 
      thread_pool.spawn_fn(....)}); 
     let mut one = future::select_all(futures); 
     while let Ok((value, _idx, remaining)) = one.wait() {..} //wait here 
    } 
    //depend on task end state and event set schedule to true or false. 

} 

私は将来的に共同スケジュールとタスクすることができますように:

let future = schedule.and_them(|task| execute_task); 

しかし、私はまだ最初のタスクの実行の終了を待つ必要があります。 私はすべてを将来に置くことができます(イベント管理、スケジュール、タスク)、提案するように終了する最初のものを待ちます。私は試してみましたが、別のアイテムとエラーのタイプで未来のvecを作る方法を見ていませんでした。そして、この概念では、スレッド間でより多くのデータを管理する必要があります。イベント管理とスケジューリングを別のスレッドで実行する必要はありません。

別の問題があります。select_allはvecの所有権を持ちます。新しいタスクが他のタスクの実行中にスケジュールされなければならない場合、どのようにしてvecを変更して新しい将来を追加することができますか?

単純な解決策があるかどうかわかりません。私はisDone()のようなメソッドを使って、その実行中に未来の状態を待たずに得るのは簡単だと考えていました。おそらくそれは計画されている、私はそれについてのPRを見ていない。 シンプルなソリューションをお持ちでしたら、私の考え方を再考してください。

+0

このように原子変数を使用したくない場合は99.9%です**。代わりに 'fetch_add'が必要です。大多数の人々は' Relaxed'の注文を望んでいません。 – Shepmaster

+0

あなたの権利は、他の先物の実行に依存する将来の実行から結果を得たいということを示すために、いくつかのコードをコピー/ペーストするだけです。 –

+0

あなたが回答を受け取った後、特に変更がそれらの回答を無効にしたときに、あなたの質問を変更することは非常に悪いフォームです*(http://meta.stackoverflow.com/q/309237/155423)。質問者は最初から関連する詳細を含む良い質問をすることが質問者にあります。 – Shepmaster

答えて

2

Futureをポーリングするには、Taskが必要です。 Taskを取得するには、Futurefutures::executor::spawn()に渡して投票できます。例:

futures::executor::spawn(futures::lazy(|| { 
    // existing loop goes here 
})).wait_future(); 

実行すると、loopのように書き換えます。 Futureが唯一のタスクでポーリングすることができる理由については

、私はそのポーリングがTask::Unparkを呼び出すことができ、それはそうだと信じています。私はそれを理解したよう

0

I want to do something during the future processing

は、それが何ある先物 - 並行して起こることができるもの。あなたが何か他のことをしたいなら、もう一度将来を作り、それを投げ込んでください!

あなたは基本的に既にこれを行っています - あなたのスレッドはそれぞれ「何か他のことをしています」。

poll the future and when it's ready get the result

future::select_all使用して、複数の先物を組み合わせることができますし、最初の終了いずれかを取得します。それで、次のものを待つことにします。

一つの潜在的な実装:waitへの通話中に

extern crate rand; 
extern crate futures; 
extern crate futures_cpupool; 

use rand::Rng; 
use futures::{future, Future}; 
use futures_cpupool::CpuPool; 

use std::{thread, time}; 

fn main() { 
    let thread_pool = CpuPool::new(4); 

    let futures = (0..10).map(|i| { 
     thread_pool.spawn_fn(move || -> Result<usize,()> { 
      let mut rng = rand::thread_rng(); 
      // Simulate long processing 
      let sleep_time = rng.gen_range(10, 100); 
      let sleep_time = time::Duration::from_millis(sleep_time); 
      for _ in 0..10 { 
       println!("Thread {} sleeping", i); 
       thread::sleep(sleep_time); 
      } 
      Ok(i) 
     }) 
    }); 

    let mut one = future::select_all(futures); 
    while let Ok((value, _idx, remaining)) = one.wait() { 
     println!("Future #{} finished", value); 
     if remaining.is_empty() { 
      break; 
     } 
     one = future::select_all(remaining); 
    } 
} 

、複数のものが同時に起こっています!これは、インターリーブされた出力で見ることができます。

Thread 2 sleeping 
Thread 0 sleeping 
Thread 3 sleeping 
Thread 1 sleeping 
Thread 3 sleeping 
Thread 0 sleeping 
Thread 1 sleeping 
Thread 2 sleeping 
Thread 3 sleeping 

あなたがスレッドごとに1秒にスリープ時間を設定し、プログラム全体のタイミングによって物事が並行して起こっていることを確認することができます。 10の未来があるので、並列性4で1秒かかると、全体のプログラムは3秒かかる。


ボーナスコードレビュー:

  1. スプリットないロードを行うとインクリメントを実装するためにアトミック変数を設定 - 格納された値が2つの呼び出しの間に別のスレッドによって変更された可能性があります。 fetch_addを使用してください。
  2. 実際には、what the orderings meanを知っている必要があります。私はそれらを知らないので、私はいつもSeqCstを使用します。
  3. この例では重要ではなかったので、私は原子変数を完全に削除しました。
  4. ループ内でループするのではなく、いつもVecに収集するほうが好きです。これにより、より最適化された割り当てが可能になります。
  5. この場合、Vecの必要はありません。select_allはイテレータに変換できるものをすべて受け入れるためです。
関連する問題