2012-09-07 7 views
15

私は2つのthreading.Event()オブジェクトを持っていて、いずれか1つが設定されるまでスリープ状態にしたい場合は、それを効率的に行う方法がありますか?明らかに私はポーリング/タイムアウトを使って何かをすることができましたが、ファイルディスクリプタにどのようにselectが使用されているかに似ています。Pythonスレッディング:2つのスレッドで眠れますか?

したがって、次の実装では、wait_for_eitherの効率的な非ポーリング実装はどのように見えますか?

a = threading.Event() 
b = threading.Event() 

wait_for_either(a, b) 
+0

2つの異なるイベントを使用し、同じイベントを使用しない理由はありますか? –

+0

@Iuliusイベント駆動型のスレッドを1つ持っていますが、2つのキューがあります... qがアイテムを取得したときに目を覚ます必要があります – pyInTheSky

+0

Pythonにはこの組み込み関数がありません。 –

答えて

18

非ポーリング非過度のスレッドソリューションです:彼らは変更するたびにコールバックを発射するために、既存のEvent Sを変更し、そのコールバックで新しいイベントを設定扱う:

import threading 

def or_set(self): 
    self._set() 
    self.changed() 

def or_clear(self): 
    self._clear() 
    self.changed() 

def orify(e, changed_callback): 
    e._set = e.set 
    e._clear = e.clear 
    e.changed = changed_callback 
    e.set = lambda: or_set(e) 
    e.clear = lambda: or_clear(e) 

def OrEvent(*events): 
    or_event = threading.Event() 
    def changed(): 
     bools = [e.is_set() for e in events] 
     if any(bools): 
      or_event.set() 
     else: 
      or_event.clear() 
    for e in events: 
     orify(e, changed) 
    changed() 
    return or_event 

を使用例:

def wait_on(name, e): 
    print "Waiting on %s..." % (name,) 
    e.wait() 
    print "%s fired!" % (name,) 

def test(): 
    import time 

    e1 = threading.Event() 
    e2 = threading.Event() 

    or_e = OrEvent(e1, e2) 

    threading.Thread(target=wait_on, args=('e1', e1)).start() 
    time.sleep(0.05) 
    threading.Thread(target=wait_on, args=('e2', e2)).start() 
    time.sleep(0.05) 
    threading.Thread(target=wait_on, args=('or_e', or_e)).start() 
    time.sleep(0.05) 

    print "Firing e1 in 2 seconds..." 
    time.sleep(2) 
    e1.set() 
    time.sleep(0.05) 

    print "Firing e2 in 2 seconds..." 
    time.sleep(2) 
    e2.set() 
    time.sleep(0.05) 

結果はあった:

Waiting on e1... 
Waiting on e2... 
Waiting on or_e... 
Firing e1 in 2 seconds... 
e1 fired!or_e fired! 

Firing e2 in 2 seconds... 
e2 fired! 

のThスレッドセーフである必要があります。コメントは大歓迎です。

編集:ああ、あなたのwait_for_either関数ですが、私がコードを書いた方法でも、or_eventを作成して渡すことをお勧めします。 or_eventは手動で設定または消去しないでください。

def wait_for_either(e1, e2): 
    OrEvent(e1, e2).wait() 
+2

これはいいね!しかし、1つの問題があります。同じイベントを2回「orify」すると、そのイベントを設定またはクリアするたびに無限ループが発生します。 – Vincent

+0

それは良い点です!すぐに修正されます – Claudiu

+0

ありがとうございました!それはまさに私が探していたものです。この回答のコードをオープンソースのライセンス条件で使用されることに同意しますか?彼らはNumpy、Pandas、Scipyなどと互換性があるので、BSDやMITは理想的でしょう。 – naitsirhc

4

一つの解決策(ポーリングとは)私はあなたがチューンタイムアウトを十分にあれば結果はOKだろうと考えてループ内の各Event

def wait_for_either(a, b): 
    while True: 
     if a.wait(tunable_timeout): 
      break 
     if b.wait(tunable_timeout): 
      break 

をシーケンシャル待機を行うことであろう。


私は考えることができる最高の非ポーリングは別のスレッドで各1を待つ、あなたがメインスレッドにした後に待機する共有Eventを設定することです。

def repeat_trigger(waiter, trigger): 
    waiter.wait() 
    trigger.set() 

def wait_for_either(a, b): 
    trigger = threading.Event() 
    ta = threading.Thread(target=repeat_trigger, args=(a, trigger)) 
    tb = threading.Thread(target=repeat_trigger, args=(b, trigger)) 
    ta.start() 
    tb.start() 
    # Now do the union waiting 
    trigger.wait() 

かなり面白いので、私は以前のソリューションのOOPのバージョンを書いた:

class EventUnion(object): 
    """Register Event objects and wait for release when any of them is set""" 
    def __init__(self, ev_list=None): 
     self._trigger = Event() 
     if ev_list: 
      # Make a list of threads, one for each Event 
      self._t_list = [ 
       Thread(target=self._triggerer, args=(ev,)) 
       for ev in ev_list 
      ] 
     else: 
      self._t_list = [] 

    def register(self, ev): 
     """Register a new Event""" 
     self._t_list.append(Thread(target=self._triggerer, args=(ev,))) 

    def wait(self, timeout=None): 
     """Start waiting until any one of the registred Event is set""" 
     # Start all the threads 
     map(lambda t: t.start(), self._t_list) 
     # Now do the union waiting 
     return self._trigger.wait(timeout) 

    def _triggerer(self, ev): 
     ev.wait() 
     self._trigger.set() 
+1

あなたはrepeat_triggerもトリガーをチェックすることができます(トリガーのタイムアウト= 0、ウェイターのタイムアウト> 0)。すべてのスレッドが最終的に終了します –

+0

同じ考えですが、2スレッドを開始するよりも良い方法です... – Claudiu

0

はきれいではありませんが、しかし、あなたは、イベントを多重化するために、2つの追加のスレッドを使用することができます...

def wait_for_either(a, b): 
    flag = False #some condition variable, event, or similar 

    class Event_Waiter(threading.Thread): 
    def __init__(self, event): 
     self.e = event 
    def run(self): 
     self.e.wait() 
     flag.set() 

    a_thread = Event_Waiter(a) 
    b_thread = Event_Waiter(b) 
    a.start() 
    b.start() 
    flag.wait() 

非常に早く到着した場合、両方のイベントを誤って受信する心配があります。ヘルパースレッド(a_threadとb_thread)は、フラグを設定しようとすると同期してロックし、別のスレッドを強制終了する必要があります(消費された場合、そのスレッドのイベントをリセットする可能性があります)。ここで

1

余分なスレッドを開始することは明らかな解決策に見えますが、それほど効果的ではありません。 関数wait_eventsは、イベントのいずれか1つが設定されたutilをブロックします。

def wait_events(*events): 
    event_share = Event() 

    def set_event_share(event): 
     event.wait() 
     event.clear() 
     event_share.set() 
    for event in events: 
     Thread(target=set_event_share(event)).start() 

    event_share.wait() 

wait_events(event1, event2, event3) 
+0

どちらがトリガーされたか知っているといいでしょう – Har

0
def wait_for_event_timeout(*events): 
    while not all([e.isSet() for e in events]): 
     #Check to see if the event is set. Timeout 1 sec. 
     ev_wait_bool=[e.wait(1) for e in events] 
     # Process if all events are set. Change all to any to process if any event set 
     if all(ev_wait_bool): 
      logging.debug('processing event') 
     else: 
      logging.debug('doing other work') 


e1 = threading.Event() 
e2 = threading.Event() 

t3 = threading.Thread(name='non-block-multi', 
         target=wait_for_event_timeout, 
         args=(e1,e2)) 
t3.start() 

logging.debug('Waiting before calling Event.set()') 
time.sleep(5) 
e1.set() 
time.sleep(10) 
e2.set() 
logging.debug('Event is set') 
1

あなたがイベント1またはイベント2またはイベント1を待つことができますいずれかClaudiu's答え、さらには2

from threading import Thread, Event, _Event 

class ConditionalEvent(_Event): 
    def __init__(self, events_list, condition): 
     _Event.__init__(self) 

     self.event_list = events_list 
     self.condition = condition 

     for e in events_list: 
      self._setup(e, self._state_changed) 

     self._state_changed() 

    def _state_changed(self): 
     bools = [e.is_set() for e in self.event_list] 
     if self.condition == 'or': 

      if any(bools): 
       self.set() 
      else: 
       self.clear() 

     elif self.condition == 'and': 

      if all(bools): 
       self.set() 
      else: 
       self.clear() 

    def _custom_set(self,e): 
     e._set() 
     e._state_changed() 

    def _custom_clear(self,e): 
     e._clear() 
     e._state_changed() 

    def _setup(self, e, changed_callback): 
     e._set = e.set 
     e._clear = e.clear 
     e._state_changed = changed_callback 
     e.set = lambda: self._custom_set(e) 
     e.clear = lambda: self._custom_clear(e) 

使用例を拡張する前と非常に似ています

import time 

e1 = Event() 
e2 = Event() 

or_e = ConditionalEvent([e1, e2], 'or') 


Thread(target=wait_on, args=('e1', e1)).start() 
time.sleep(0.05) 
Thread(target=wait_on, args=('e2', e2)).start() 
time.sleep(0.05) 
Thread(target=wait_on, args=('or_e', or_e)).start() 
time.sleep(0.05) 

print "Firing e1 in 2 seconds..." 
time.sleep(2) 
e1.set() 
time.sleep(0.05) 

print "Firing e2 in 2 seconds..." 
time.sleep(2) 
e2.set() 
time.sleep(0.05) 
関連する問題