2011-10-25 11 views
1

私はTwitterのストリーミングAPIへの接続を作成するタスクを(サブクラス化してcelery.task.Taskによって)作成しています。 Twitter API呼び出しの場合、私はtweepyを使用しています。セロリのドキュメンテーションから読んだように、「タスクはすべての要求に対してインスタンス化されるのではなく、グローバルインスタンスとしてタスクレジストリに登録されます。私は、タスクのapply_async(または遅延)を呼び出すたびに、もともとインスタンス化されたが、それは起こらないタスクにアクセスすることを期待していました。代わりに、カスタム・タスク・クラスの新しいインスタンスが作成されます。元のカスタムタスクにアクセスできるようにする必要があります。これは、tweepy API呼び出しによって作成された元の接続を終了できる唯一の方法であるためです。セロリはタスクのいくつかのインスタンスを作成します

from celery import registry 
from celery.task import Task 

class FollowAllTwitterIDs(Task): 
    def __init__(self): 
     # requirements for creation of the customstream 
     # goes here. The CustomStream class is a subclass 
     # of tweepy.streaming.Stream class 

     self._customstream = CustomStream(*args, **kwargs) 

    @property 
    def customstream(self): 
     if self._customstream: 
      # terminate existing connection to Twitter 
      self._customstream.running = False 
     self._customstream = CustomStream(*args, **kwargs) 

    def run(self): 
     self._to_follow_ids = function_that_gets_list_of_ids_to_be_followed() 

     self.customstream.filter(follow=self._to_follow_ids, async=False) 
follow_all_twitterids = registry.tasks[FollowAllTwitterIDs.name] 

とDjangoビューの

def connect_to_twitter(request): 
    if request.method == 'POST': 
     do_stuff_here() 
     . 
     . 
     . 

     follow_all_twitterids.apply_async(args=[], kwargs={}) 

    return 

を任意の助けをいただければ幸いです:これは役立つだろう場合はここで

は、コードのいくつかの作品です。 :D

EDIT:質問のための追加のコンテキストについて

フィルタ()メソッドが呼び出されるたびに、CustomStreamオブジェクトはhttplib.HTTPSConnectionインスタンスを作成します。この接続は、別の接続を作成しようとするたびに閉じる必要があります。 customstream.runningをFalseに設定すると、接続は閉じられます。

答えて

0

あなたはそれが何らかの理由ではないと思う場合は、タスクが唯一の私は、あなたが

プリント(「インスタンス化」) 輸入トレースバック traceback.print_stack()

を追加提案 、一度インスタンス化されなければなりません

からTask.__init__までの方法があるので、どこでこれが起こっているのかを知ることができます。

私はあなたのタスクは、より良い、このように表現することができると思います:返信用

from celery.task import Task, task 

class TwitterTask(Task): 
    _stream = None 
    abstract = True 

    def __call__(self, *args, **kwargs): 
     try: 
      return super(TwitterTask, self).__call__(stream, *args, **kwargs) 
     finally: 
      if self._stream: 
       self._stream.running = False 

    @property 
    def stream(self): 
     if self._stream is None: 
      self._stream = CustomStream() 
     return self._stream 

@task(base=TwitterTask) 
def follow_all_ids(): 
    ids = get_list_of_ids_to_follow() 
    follow_all_ids.stream.filter(follow=ids, async=false) 
+0

感謝。私は上記を実装しようとし、ちょうどcelery.utils.cached_propertyが使用されているのだろうか? – Christian

+0

それは間違ってそこに追加されたばかりです:私はそれを削除します – asksol

関連する問題