2016-12-20 20 views
2

私はgrails 3.2.3バージョンとrabbitmq native plugin 3.3.2http://budjb.github.io/grails-rabbitmq-native/doc/manual/)を使用しています。私は次のシナリオを達成しようとしています。 enter image description here
説明:は、私は、ヘッダーと単一のキューに複数のメッセージを送信することだし、消費者のセクションでは、私は特定のフィルタリングによってメッセージを消費する結合適用しようとしました。しかし、消費者はフィルタリングに関係なくすべてのメッセージを消費します。つまり、バインディングが機能していないことを意味します。また、私はrabbitmqの初心者です。だから、どんな助けや方向性も高く評価されています。以下は私のコードです。 application.groovyでGrails rabbitmqネイティブメッセージをコンシューマに送信

キューの設定:関数をキューに送信

rabbitmq { 
    queues = [ 
     [ 
       name  : "mail.queue", 
       connection: "defaultConnection", 
       durable : true 
     ] 
] 

}

sendToQueueにここ

protected void sendToQueue(QueueType queueType, Map message, Map<String, String> binding = null) { 
    rabbitMessagePublisher.send { 
     routingKey = queueType.queueName 
     body = message 
     autoConvert = true 
     if (headers != null) { 
      headers = binding 
     } 
    } 
} 

を私は第3のパラメータはオプション製場合によっては複数のタイプの消費者を必要としません。

キューに送るコール:

sendToQueue(QueueType.EMAIL_QUEUE, [user: user], ["emailType": EmailType.PASSWORD_RESET.name()]) 
sendToQueue(QueueType.EMAIL_QUEUE, [user: user], ["emailType": EmailType.PASSWORD_RESET_SUCCESS.name()]) 

消費者1:

static rabbitConfig = [ 
     queue : QueueType.EMAIL_QUEUE.queueName, 
     binding : ["emailType": EmailType.PASSWORD_RESET.name()], 
     match : "all", 
     consumer: 10 
] 

def handleMessage(Map message, MessageContext context) { 
    print("From PasswordResetEmailConsumer consumer") 
    println(message) 
    passwordResetEmailService.sendPasswordResetMail(message) 
} 

消費者2:R後

static rabbitConfig = [ 
     queue : QueueType.EMAIL_QUEUE.queueName, 
     binding : ["emailType": EmailType.PASSWORD_RESET_SUCCESS.name()], 
     match : "all", 
     consumer: 10 
] 

def handleMessage(Map message, MessageContext context) { 
    print("From PasswordResetSuccessEmailConsumer consumer") 
    println(message) 
    passwordResetSuccessEmailService.sendPasswordResetSuccessMail(message) 
} 

答えて

2

rabbitmqのドキュメントを読み返す私は、単一のキューからメッセージを選択的に取り出すことができないことを認識しました。

パブリッシャがルーティングキーと交換するメッセージを公開し、それらのメッセージは、バインドされたキューに配信されます別のオプション"Exchange"はあるが消費者には、キュー

からすべてのメッセージを受け取ります。 More:RabbitMQ Publish/Subscribe Model
基本的な考え方はここにも記載されています:Stackoverflow: RabbitMQ selectively retrieving messages from a queue
とにかく、私の解決策では、複数のキューを必要としませんでした。そこで私は単一のコンシューマを作成し、実際のハンドラクラスBeanリファレンスにメッセージを送り、メッセージをディスパッチしました。実装を共有して、これが誰かに役立つことを願ってください:

アプリケーション内のキュー設定。グルーヴィー:

rabbitmq { 
    queues = [ 
     [ 
       name  : "mail.queue", 
       connection: "defaultConnection", 
       durable : true 
     ] 
    ] 
} 

関数をキューに送信:

protected void sendToQueue(Map message, QueueType queueType, Class<BaseQueueHandler> queueHandlerServiceClass) { 
    message.queueHandlerServiceClass = queueHandlerServiceClass.name 
    rabbitMessagePublisher.send { 
     routingKey = queueType.queueName // queue name from enum: "mail.queue" 
     body = message 
     autoConvert = true 
    } 
} 

ハンドラインタフェース:

interface BaseQueueHandler { 
    void handleMessage(Map message, MessageContext context) 
} 

をキューに送信:

sendToQueue([user: user], QueueType.EMAIL_QUEUE, PasswordResetEmailService.class) 

キューの消費者:

class EchoEmailQueueConsumer { 

    static rabbitConfig = [ 
      queue : QueueType.ECHO_EMAIL_QUEUE.queueName, 
      consumer: 10 
    ] 

    GrailsApplication grailsApplication 

    def handleMessage(Map message, MessageContext context) { 
     String handlerClass = message.remove("queueHandlerServiceClass") 
     Class<BaseQueueHandler> handlerClassType = Class.forName(handlerClass); 
     BaseQueueHandler queueService = grailsApplication.mainContext.getBean(handlerClassType) 
     queueService.handleMessage(message, context) 
    } 

} 

Handlerインタフェースを実装して最後にハンドラサービス:

class PasswordResetEmailService implements BaseQueueHandler { 

    @Override 
    void handleMessage(Map message, MessageContext context) { 
     println("message received in PasswordResetEmailService") 
    } 
} 
関連する問題