2017-03-07 1 views
2

多くのブロックアクションのタイムアウトのシグナルに依存するアプリケーションがあります。例えばpythonスレッドのタイムアウト信号の代替手段

def wait_timeout(signum, frame): 
    raise Exception("timeout") 

signal.signal(signal.SIGALRM, wait_timeout) 
signal.setitimer(signal.ITIMER_REAL, 5) 

try: 
    while true: 
     print("zzz") 
     sleep(1) 
except Exception as e: 
    # timeout 
    print("Time's up") 

は今、私は同じアプローチを使用してマルチスレッドを実装しましたが、すべてのスレッドで私がValueError: signal only works in main threadを取得します。

シグナルでタイムアウトする方法はスレッドには適用できないと仮定します。 whileループでの操作がブロックし、おそらく永遠に続くされるように、したがってループはif節に到達しない可能性があり、

timeout = 5 
start = time.time() 

while true: 
    print("zzz") 
    sleep(1) 
    if time.time() <= start+timeout: 
     print("Time's up) 
     break 

残念ながら、私はこのようなものを使用することはできません。

Q:信号でスレッドを実行していたように、タイムアウトを実装する方法を教えてください。

編集:私はPythonでJavaScriptでsetTimeout()と同様のソリューションを示し、this blog postに遭遇しました。これは可能な解決策かもしれないと私は想定していますが、実際にどのように使用するのかは分かりません。

EDIT2:以下のようには私がメインでスレッドを開始:

p = Popen(["tool", "--param", arg], stdin=PIPE, stdout=PIPE, stderr=STDOUT) 
t = Thread(target=process_thread, daemon=True, args=(p,arg1,arg2)) 
t.start() 

process_thread関数は以下のようにして、toolの標準出力を処理します。

for line in p.stdout: 
    # process line of the processes stdout 

この処理することができます永遠に、例えば取るtoolは出力を生成しません。 toolの出力は5秒としましょう。したがって、特定のタイムアウト後にforループを壊す必要があります。

これは私が信号を使用したものですが、明らかにスレッドでは機能しません。

edit3:私はスレッドで歌を使用する方法について、より洗練された正確な例を作成しました。 See the gist here

+0

はどのようにあなたのスレッドを開始していますか?定義するときに 'daemon = True'を設定していますか?もしそうなら、それらのスレッドは 'main'スレッドが死んだときに殺されます。それはあなたがやろうとしていることですか? – Billy

+0

はい、私はデーモンとしてスレッドを開始しています。私は1分でOPを編集します。いいえ、それは私が試みているものではありません、私はOPでそれをよりよく説明しようとします。 – SaAtomic

+0

私はOP @ Billyを更新しました – SaAtomic

答えて

2

あなたが探しているのはウォッチドッグです。

def watchdog(queue): 
    while True: 
     watch = queue.get() 
     time.sleep(watch.seconds) 

     try: 
      watch = queue.get_nowait() 
      # No except, got queue message, 
      # do noting wait for next watch 

     except queue.Empty: 
      os.kill(watch.pid, signal.SIGKILL) 

def workload_thread(queue): 
    pid = os.getpid() 
    queue.put({'pid':pid, 'seconds':5}) 

    # do your work 
    # Test Watchdog 
    # time.sleep(6) 

    queue.put({'pid':pid, 'done':True}) 

注:コードがテストされていない、構文エラーがあるかもしれません!

+0

これは以前見たことがありますが、今のところそれを頭に浮かべてください。ウォッチドッグのトピックを調べると、私も[this](http://liveincode.blogspot.de/2012/11/watchdog-timer-in-python.html)が見つかりましたが、まだどこから開始するのかはわかりません。あなたはとても親切で、私が投稿したコードの例を作成しますか? – SaAtomic

1

これはThreads Popen processに与えられたtimeout=5signal.SIG...を送信class Terminator、 を実装しています。複数の異なるpidが可能です。

class Terminator(object): 
    class WObj(): 
     def __init__(self, process, timeout=0, sig=signal.SIGABRT): 
      self.process = process 
      self.timeout = timeout 
      self.sig = sig 

    def __init__(self): 
     self.__queue = queue.Queue() 
     self.__t = Thread(target=self.__sigterm_thread, args=(self.__queue,)) 
     self.__t.start() 
     time.sleep(0.1) 

    def __sigterm_thread(self, q): 
     w = {} 
     t = 0 
     while True: 
      time.sleep(0.1); 
      t += 1 
      try: 
       p = q.get_nowait() 
       if p.process == 0 and p.sig == signal.SIGTERM: 
        # Terminate sigterm_thread 
        return 1 

       if p.process.pid not in w: 
        if p.timeout > 0 and p.sig != signal.SIGABRT: 
         w[p.process.pid] = p 
       else: 
        if p.sig == signal.SIGABRT: 
         del (w[p.process.pid]) 
        else: 
         w[p.process.pid].timeout = p.timeout 

      except queue.Empty: 
       pass 

      if t == 10: 
       for key in list(w.keys()): 
        p = w[key] 
        p.timeout -= 1 
        if p.timeout == 0: 
         """ A None value indicates that the process hasn't terminated yet. """ 
         if p.process.poll() == None: 
          p.process.send_signal(p.sig) 
         del (w[p.process.pid]) 
       t = 0 
      # end if t == 10 
     # end while True 

    def signal(self, process, timeout=0, sig=signal.SIGABRT): 
     self.__queue.put(self.WObj(process, timeout, sig)) 
     time.sleep(0.1) 

    def close(self, process): 
     self.__queue.put(self.WObj(process, 0, signal.SIGABRT)) 
     time.sleep(0.1) 

    def terminate(self): 
     while not self.__queue.empty(): 
      trash = self.__queue.get() 

     if self.__t.is_alive(): 
      self.__queue.put(self.WObj(0, 0, signal.SIGTERM)) 

    def __enter__(self): 
     return self 

    def __exit__(self, exc_type, exc_val, exc_tb): 
     self.__del__() 

    def __del__(self): 
     self.terminate() 

これは、インスタンスのワークロードです:

はPythonでテスト
def workload(n, sigterm): 
    print('Start workload(%s)' % n) 
    arg = str(n) 
    p = Popen(["tool", "--param", arg], stdin=PIPE, stdout=PIPE, stderr=STDOUT) 

    sigterm.signal(p, timeout=4, sig=signal.SIGTERM) 
    while True: 
     for line in p.stdout: 
      # process line of the processes stdout 
      print(line.strip()) 
      time.sleep(1) 

     if p.poll() != None: 
      break 

    sigterm.close(p) 
    time.sleep(0.1) 
    print('Exit workload(%s)' % n) 

if __name__ == '__main__': 
    with Terminator() as sigterm: 
     p1 = Thread(target=workload, args=(1, sigterm)); p1.start(); time.sleep(0.1) 
     p2 = Thread(target=workload, args=(2, sigterm)); p2.start(); time.sleep(0.1) 
     p3 = Thread(target=workload, args=(3, sigterm)); p3.start(); time.sleep(0.1) 
     p1.join(); p2.join(); p3.join() 

     time.sleep(0.5) 
    print('EXIT __main__') 

:3.4.2とPython:2.7.9

+0

フィードバックとサンプルをありがとう、私はOPで述べたように、複数のスレッドではうまくいかないと思います。私はOPとしてリンクとしてより精巧なコードスニペットを追加しました。 – SaAtomic

+0

また、そのソリューションを複数のスレッドに拡張しようとすると、すべてのスレッドが同じPIDを報​​告するという問題が発生します。 – SaAtomic

+1

はい、スケーラブルではありません。あなたのより精巧なコードスニペットを読んだ後、scalabelソリューションで戻ってきます。 – stovfl