2016-10-12 36 views
0

私はCeleryRabbitMQブローカーを使用してDjangoプロジェクトを持っています。そして今、私はNodeJSサーバーからdjango(セロリ)タスクを呼びたいと思います。セロリ(Django)+ RabbitMQ + nodejsサーバー交換データ

私のNodeJSで私はamqplibを使用しています。私はRabbitMQにタスクを送信できるようにどれ:

amqp.connect('amqp://localhost', function(err, conn) { 
    conn.createChannel(function(err, ch) { 
    var q = 'celery'; 

    ch.assertQueue(q, {durable: true}); 
    ch.sendToQueue(q, new Buffer('What should I write here?')); 
    }); 
}); 

私の質問はどのような形式セロリの使用ですか?バッファにセロリを呼び出すにはどうすればいいですか?私CELERY_ROUTES(Djangoの設定)で例えば

、私はblabla.tasks.addを持っている:

CELERY_ROUTES = { 
    ... 
    'blabla.tasks.add': 'high-priority', 
} 

このblabla.tasks.add機能を呼び出す方法?

私は多くの方法を試してみたが、セロリの労働者は私にエラーを与える:Received and deleted unknown message. Wrong destination?!?

答えて

0

私はここに解決策http://docs.celeryproject.org/en/latest/internals/protocol.htmlを見つけました。

例メッセージのフォーマットは次のとおり

{ "id": "4cc7438e-afd4-4f8f-a2f3-f46567e7ca77", "task": "celery.task.PingTask", "args": [], "kwargs": {}, "retries": 0, "eta": "2009-11-17T12:30:56.527191" }

ように、コードは次のようになります

amqp.connect('amqp://localhost', function(err, conn) { 
    conn.createChannel(function(err, ch) { 
    var q = 'celery'; 

    ch.assertQueue(q, {durable: true}); 
    ch.sendToQueue(q, new Buffer('{"id": "this-is-soo-unique-id", "task": "blabla.tasks.add", "args": [1, 2], "kwargs": {}, "retries": 0}'), { 
     contentType: 'application/json', 
     contentEncoding: 'utf-8', 
    }); 
    }); 
}); 
関連する問題