2011-02-07 12 views
17

非常に長い間、私はAppEngineのタスクキューを使用して、私が想定していた方法でタスクをスケジュールしてきました。AppEngineのユニットテストタスクキュー

しかし、私がいつも疑問に思っていたのは、そのためのテストを書く方法です。今までは、タスクをキューに入れているAPIでエラーが発生していないことを確認した後、タスクを実行するAPIに対してより適切なテストを書き留めていることを確認しました。

は、しかし、最近私はこれで少し不満を感じ始めていると私は実際に正しいタスクが正しいキューに追加されたことをテストする方法を探しています。うまくいけば、これは単にコードをデプロイし、最高のものを期待するよりもうまくいくでしょう。

それが答え上の任意のベアリングを持っている場合、私は、ジャンゴ・nonrelを使用しています。

要約:タスクがキューに入れられたことを確認するために単体テストを書く方法を教えてください。

答えて

13

を使用すると、タスクキューをスタブアウトすることができます。

FunctionalTestCaseまたはTaskQueueTestCaseから継承すると、get_tasksassertTasksInQueueなどのメソッドが表示されます。

実際にタスクを実行することもできます。それを行う方法は、タスクを使用するか、延期するかによって異なります。

from google.appengine.ext import deferred 
import base64 

# gets the most recent task -- since the task queue is reset between tests, 
# this is usually what you want 
def get_task(self): 
    for task in self.get_task_queue_stub().GetTasks('default'): 
     return task 

# decode and execute the deferred method 
def run_deferred(task): 
    deferred.run(base64.b64decode(task['body'])) 

実行中のタスク似ていますが、タスクを取得した後、あなたはWebTestのは(GAEテストベッドの上に構築されている)として提出するために使用する:Deferredの作り方については

、私はこのようないくつかのコードを持っていますPOSTの要求をタスクのURLに渡します。

+0

これはトリックを行うように見えます。ありがとう! –

+0

すべての基本テストケース(https://github.com/jgeewax/gaetestbed/blob/master/gaetestbed/)を使用している場合は、 'self.get_tasks'と' task ['decoded_body'] 'をショートカットとして使用できますtaskqueue.py)。 さらに、これは間もなくgoogle.appengine.ext.testbedの一部になるとうれしいです。 'get_tasks'(https://code.google.com/p/googleappengine/source/browse/trunk/python/google/appengine/api/taskqueue/taskqueue_stub)とよく似た' get_filtered_tasks'メソッドがあります。py#2453) –

+0

私の答えを参照してください:このライブラリはext.testbed(https://developers.google.com/appengine/docs/python/tools/localunittesting)に賛成して非難されています –

1

gaetestbedというテストフレームワークがありますが、これはあなたのニーズに合っています。詳細はhttps://github.com/jgeewax/gaetestbedを参照してください。

このテスト環境は、鼻、鼻-GAEプラグインとWebTestのパッケージに関連して動作します。 GAEアプリケーションをテストする最良の方法は、Pythonパッケージを混在させることです。 GAE Test Bedを使用して

+4

私がマージされてきたように、このパッケージは、実際に廃止されましたgoogle.appengine.ext.testbed(https://developers.google.com/appengine/docs/python/tools/localunittesting)への機能 –

0

SDK 1.4.3Testbed APIローカル統合テストのためのスタブライブラリの簡単な設定を提供します。 Task Queueため

Aサービススタブ可能です。あなたがgoogle.appengine.ext.testbedを使用しての代わりに、GAEテストベッド(GAEテストベッドが廃止され、ext.testbedに移動されている)している場合は、次の操作を行うことができます

18

import base64 
import unittest2 

from google.appengine.ext import deferred 
from google.appengine.ext import testbed 


class TestTasks(unittest2.TestCase): 
    def setUp(self): 
    self.testbed = testbed.Testbed() 
    self.testbed.activate() 
    self.testbed.init_taskqueue_stub() 
    self.taskqueue_stub = self.testbed.get_stub(testbed.TASKQUEUE_SERVICE_NAME) 

    def tearDown(self): 
    self.testbed.deactivate() 

    def test_send_contact_request(self): 
    # Make the request to your app that "defers" something: 
    response = ... 
    self.assertEqual(response.status_int, 200) 

    # Get the task out of the queue 
    tasks = self.taskqueue_stub.get_filtered_tasks() 
    self.assertEqual(1, len(tasks)) 

    # Run the task 
    task = tasks[0] 
    deferred.run(task.payload) 

    # Assert that other things happened (ie, if the deferred was sending mail...) 
    self.assertEqual(...) 
+1

これは受け入れられた答えでなければなりません。ありがとう – Houman

+0

独自のキューでこれを行うことは可能ですか?私は のdeferred.defer(get_and_store_all_new_messages_async、user_id、query、next_page_token、_queue = "emailFetch") を使用しますが、ユニットテスト用のqueue.yamlファイルを読み込んでいない可能性があるため、キューを認識しません –