2016-09-12 3 views
0

私はプロデューサ - コンシューマパターンのQueueを持っています。それは入ってくるイベントを消費し、5秒以内に送信された修飾イベントをスケジュールします。私はそれを行うにはthreading.Timer()python documentを使用してすべてがうまくいっていた。スレッドの代わりに。タイマー?

最近、スケジュールされた時間を5秒から30分に変更するように要求されました。以前はスレッドオブジェクトが作成され、すぐに解放されるため(最後の5秒間)生きて30分。ここで

は、コードは次のとおりです。

if scheduled_time and out_event: 
    threading.Timer(scheduled_time, self.send_out_event, (socket_connection, received_event, out_event,)).start() # schedule event send out 

がこれにいくつかの光を当てるsomesoneすることはできますか?この問題を解決するにはどうすればよいですか、threading.Timer()の代替手段はありますか?

+0

[スケジュール](同様に、ジョブのスケジュールのためのいくつかのサードパーティのモジュールを見てみhttps://github.com/dbader/schedule )または[apscheduler](https://github.com/agronholm/apscheduler)を参照してください。 – dano

+0

@ danoあなたのコメントのおかげで、私はそれを読んで本当に便利です! – haifzhan

答えて

0

サードパーティ製のモジュールに関する@danoのコメントありがとうございます!私の仕事の要件に基づいて、私はそれらをサーバーにインストールしませんでした。

threading.Timer()を使用する代わりに、私はRedisベースの遅延キューを使用することを選択しました。オンラインで便利なソースが見つかりました:A unique Python redis-based queue with delayそれは私の問題を解決した。

簡潔に言えば、著者はソートされたセットをredisで作成し、それに名前を付けます。add()はソートされたセットに新しいデータを追加します。それはEPOC時間のスコアに基づいてソートセットから高々要素をポップするたびに、資格の最小スコアが飛び出すことになる保持要素(Redisのから削除されません)

def add(self, received_event, delay_queue_name="delay_queue", delay=config.SECOND_RETRY_DELAY): 
    try: 
     score = int(time.time()) + delay 
     self.__client.zadd(delay_queue_name, score, received_event) 
     self.__logger.debug("added {0} to delay queue, delay time:{1}".format(received_event, delay)) 
    except Exception as e: 
     self.__logger.error("error: {0}".format(e)) 


def pop(self, delay_queue_name="delay_queue"): 
    min_score, max_score, element = 0, int(time.time()), None 

    try: 
     result = self.__client.zrangebyscore(delay_queue_name, min_score, max_score, start=0, num=1, withscores=False) 
    except Exception as e: 
     self.__logger.error("failed query from redis:{0}".format(e)) 
     return None 

    if result and len(result) == 1: 
     element = result[0] 
     self.__logger.debug("poped {0} from delay queue".format(element)) 
    else: 
     self.__logger.debug("no qualified element") 
    return element 

def remove(self, element, delay_queue_name="delay_queue"): 
    self.__client.zrem(delay_queue_name, element) 

self.__clientですRedisクライアントインスタンスredis.StrictRedis(host=rhost,port=rport, db=rindex)

私のオンラインソースとの違いは、zadd()パラメータを切り替えたことです。 scoredataの順番が入れ替わります。以下はPythonのRedisのドキュメントここzadd()

のドキュメントだです:

# SORTED SET COMMANDS 
def zadd(self, name, *args, **kwargs): 
    """ 
    Set any number of score, element-name pairs to the key ``name``. Pairs 
    can be specified in two ways: 

    As *args, in the form of: score1, name1, score2, name2, ... 
    or as **kwargs, in the form of: name1=score1, name2=score2, ... 

    The following example would add four values to the 'my-key' key: 
    redis.zadd('my-key', 1.1, 'name1', 2.2, 'name2', name3=3.3, name4=4.4) 
    """ 
    pieces = [] 
    if args: 
     if len(args) % 2 != 0: 
      raise RedisError("ZADD requires an equal number of " 
          "values and scores") 
     pieces.extend(args) 
    for pair in iteritems(kwargs): 
     pieces.append(pair[1]) 
     pieces.append(pair[0]) 
    return self.execute_command('ZADD', name, *pieces)