2016-10-13 4 views
6

RabbitMQのようなメッセージブローカーは、異なる言語/プラットフォームで書かれたさまざまなアプリケーションが互いに通信することを容易にします。セロリはメッセージブローカーとしてRabbitMQを使うことができるので、たとえプロデューサーがPythonで書かれていなくても、どのアプリケーションからでもタスクをCeleryにキューイングできます。タスクをC#からCeleryにキューイングできますか?

今、私はC#で書かれたアプリケーションからRabbitMQを使ってタスクをセロリにキューイングする方法を理解しようとしています。しかし、そのような例はまだ見つかりませんでした。私が見つけたこれに近い

唯一の情報は受け入れ答えは、JavaからのRabbitMQへのメッセージをキューにセロリのメッセージ・フォーマット・プロトコルを使用することを示唆してthis SO question

です。しかし、答えに示されているリンクには例はなく、メッセージ形式だけがあります。

また、このプロトコルで通信するにはタスクID(UUID)が必要です。 C#アプリケーションはセロリタスクのタスクIDをどのように知っていますか?私が理解しているように、それはタスク名についてのみ知ることができるが、タスクIDについては知ることができない。

答えて

3

このarticleによると、セロリの.Netクライアントは、.Net Frameworkに付属のデフォルトのTaskSchedulerを使用します。これはあなたのタスクのIDを生成する方法を知っています。この記事では、hereの例も示しています。

4

質問がまだ関連しているかどうかわかりませんが、その答えが他の人に役立つことを願っています。

ここで私はタスクをクイーンニングしてCelery example workerに成功しました。

  1. あなたはhereを説明したのRabbitMQのおプロデューサー(クライアント)との間の接続を確立する必要があります。デフォルトのRabbitMQの構成で

    ConnectionFactory factory = new ConnectionFactory(); 
        factory.UserName = username; 
        factory.Password = password; 
        factory.VirtualHost = virtualhost; 
        factory.HostName = hostname; 
        factory.Port = port; 
    
        IConnection connection = factory.CreateConnection(); 
        IModel channel = connection.CreateModel(); 
    

    のみ(127.0.0.1)からローカル接続のために使用することができるだけゲストユーザが存在します。 this質問への回答は、RabbitMQでユーザを定義する方法を説明しています。

  2. 次は、結果を得るためのコールバックの作成です。

    セロリが消費するタスクのメッセージを作成
    var consumer = new EventingBasicConsumer(channel); 
        consumer.Received += (model, ea) => 
        { 
         var ansBody = ea.Body; 
         var ansMessage = Encoding.UTF8.GetString(ansBody); 
         Console.WriteLine(" [x] Received {0}", ansMessage); 
         Console.WriteLine(" [x] Done"); 
        }; 
        channel.BasicConsume(queue: "amq.rabbitmq.reply-to", noAck: true, consumer: consumer); 
    
  3. :この例ではそう答えリスナーがどのように見えるだろう、Direct reply-toを使用している

    IDictionary<string, object> headers = new Dictionary<string, object>(); 
        headers.Add("task", "tasks.add"); 
        Guid id = Guid.NewGuid(); 
        headers.Add("id", id.ToString()); 
    
        IBasicProperties props = channel.CreateBasicProperties(); 
        props.Headers = headers; 
        props.CorrelationId = (string)headers["id"]; 
        props.ContentEncoding = "utf-8"; 
        props.ContentType = "application/json"; 
        props.ReplyTo = "amq.rabbitmq.reply-to"; 
    
        object[] taskArgs = new object[] { 1, 200 }; 
    
        object[] arguments = new object[] { taskArgs, new object(), new object()}; 
    
        MemoryStream stream = new MemoryStream(); 
        DataContractJsonSerializer ser = new DataContractJsonSerializer(typeof(object[])); 
        ser.WriteObject(stream, arguments); 
        stream.Position = 0; 
        StreamReader sr = new StreamReader(stream); 
        string message = sr.ReadToEnd(); 
    
        var body = Encoding.UTF8.GetBytes(message); 
    
  4. そして最後に、RabbitMQのにメッセージを公開:

     channel.BasicPublish(exchange: "", 
             routingKey: "celery", 
             basicProperties: props, 
             body: body); 
    
関連する問題