2011-07-19 11 views
8

AMQP、Websocket、Rubyを使用して簡単なチャットアプリケーションを構築しようとしています。私はこれがAMQPを理解するのに最適なケースではないかもしれないと理解していますが、どこが間違っているのか理解したいと思います。AMQPで動的にキューに登録する

次は私のAMQPサーバコードIは、着信メッセージは、現在進行中のチャットやキューへの加入などの雑用を処理するために私を必要とするステータスメッセージのためのメッセージであるかどうかを示すためにステータスを使用

require 'rubygems' 
require 'amqp' 
require 'mongo' 
require 'em-websocket' 
require 'json' 

class MessageParser 
    # message format => "room:harry_potter, nickname:siddharth, room:members" 
    def self.parse(message) 
    parsed_message = JSON.parse(message) 

    response = {} 
    if parsed_message['status'] == 'status' 
     response[:status] = 'STATUS' 
     response[:username] = parsed_message['username'] 
     response[:roomname] = parsed_message['roomname'] 
    elsif parsed_message['status'] == 'message' 
     response[:status] = 'MESSAGE' 
     response[:message] = parsed_message['message'] 
     response[:roomname] = parsed_message['roomname'].split().join('_') 
    end 

    response 
    end 
end 

class MongoManager 
    def self.establish_connection(database) 
    @db ||= Mongo::Connection.new('localhost', 27017).db(database) 
    @db.collection('rooms') 

    @db 
    end 
end 


@sockets = [] 
EventMachine.run do 
    connection = AMQP.connect(:host => '127.0.0.1') 
    channel = AMQP::Channel.new(connection) 

    puts "Connected to AMQP broker. #{AMQP::VERSION} " 

    mongo = MongoManager.establish_connection("trackertalk_development") 

    EventMachine::WebSocket.start(:host => '127.0.0.1', :port => 8080) do |ws| 
    socket_detail = {:socket => ws} 
    ws.onopen do 
     @sockets << socket_detail 

    end 

    ws.onmessage do |message| 

     status = MessageParser.parse(message)   
     exchange = channel.fanout(status[:roomname].split().join('_')) 

     if status[:status] == 'STATUS'    
     queue = channel.queue(status[:username], :durable => true) 

     unless queue.subscribed? 
     puts "--------- SUBSCRIBED --------------" 
     queue.bind(exchange).subscribe do |payload| 
      puts "PAYLOAD : #{payload}" 
      ws.send(payload) 
      end 
     else 
      puts "----ALREADY SUBSCRIBED" 
     end     

     # only after 0.8.0rc14 
     #queue = channel.queue(status[:username], :durable => true)  
     #AMQP::Consumer.new(channel, queue)   

     elsif status[:status] == 'MESSAGE' 
     puts "********************* Message- published ******************************" 
     exchange.publish(status[:message) 
     end     
    end 

    ws.onclose do 
     @sockets.delete ws 
    end 
    end  
end 

です。

私が直面している問題は、私は socket.send(JSON.stringify({status:'message', message:'test', roomname:'Harry Potter'}))

ブラウザにexchange.publish' is called but it still doesn't get pushed via the ws.send`のようなメッセージを送信するときにということです。

EventMachineとAMQPについて私の理解に根本的に何か間違っていますか?ここで

は同じコードhttp://pastie.org/private/xosgb8tw1w5vuroa4w7a

私のコードのためのpastieである私はqueue = channel.queue(status[:username], :durable => true)

からdurable => trueを削除する際に必要に応じて動作しているようです次は私のRailsのスニペットであるユーザーのユーザー名を識別する表示し、ルーム名をWebsockets経由でメッセージの一部として送信します。

durable => trueを削除したときにコードが正常に機能しているように見えますが、なぜメッセージが配信されるのかがわかりません。モンゴーの部分は無視してください。まだ演奏されていません。

私はまた、AMQPおよびその使用方法への私のアプローチは、初見で

<script> 
    $(document).ready(function(){ 
     var username = '<%= @user.email %>'; 
     var roomname = 'Bazingaa'; 

     socket = new WebSocket('ws://127.0.0.1:8080/'); 

     socket.onopen = function(msg){ 
      console.log('connected'); 
      socket.send(JSON.stringify({status:'status', username:username, roomname:roomname})); 
     } 

     socket.onmessage = function(msg){ 
      $('#chat-log').append(msg.data); 

     } 

    }); 

</script> 
<div class='block'> 
    <div class='content'> 
    <h2 class='title'><%= @room.name %></h2> 
    <div class='inner'> 
     <div id="chat-log"> 
     </div> 

     <div id="chat-console"> 
     <textarea rows="5" cols="40"></textarea> 
     </div> 
    </div> 
    </div> 
</div> 

<style> 
    #chat-log{ 
     color:#000; 
     font-weight:bold; 
     margin-top:1em; 
     width:900px; 
     overflow:auto; 
     height:300px; 
    } 
    #chat-console{ 
     bottom:10px; 
    } 

    textarea{ 
     width:100%; 
     height:60px; 
    } 
</style> 
+0

デモとしてamqpコードを実行する場合、誰かが私のコードを本番環境で整理する方法を教えてもらえますか?私のコードを整理するのに役立つサンプルコードは非常に役に立ちます。 – Sid

答えて

1

あなたの問題は、ws.onmessageの呼び出しの間にブローカにキューがハングアップすることがあると思います。クライアントがキューに再接続し、バインディングがすでに存在する場合、ws.send()は呼び出されません。

デフォルトでは、キューとそのバインディングを作成するときに、ブローカが再起動されるまでハングアップしたり、明示的にブローカに削除を指示します。

これを変更するには、2つの方法があります:あなたはキューはブローカーがauto_deleteを追加

  • を再起動しても、周りに固執するようになりますキューを作成するとき

    • 耐久性のあるフラグを追加しますフラグを設定すると、コンシューマが接続されていない短い時間の後にブローカが自動的にエンティティを削除します。

    rabbitmqブローカを使用しているブローカを管理している場合、ブローカで起こっていることをイントロスペクションする簡単な方法は、management pluginをインストールすることです。ブローカの交換、バインディング、およびキューに対するWebインターフェイスを提供します。

  • 0

    正しいかどうAMQPビットはOKのように見えるが、私はすべての設定したくない知っていただきたいと思います依存関係。あなたがAMQPの部分だけで最小限の例を提供すれば、私はそれをチェックします。

    +0

    私のRailsコードでは、ユーザーの電子メールIDとルーム名を識別し、WebSocket経由でAMQPサーバーコードに渡します。 耐久性を削除するとコードが動作するように見えますが、なぜメッセージが配信されるのか理解できません。モンゴーの部分は無視してください。まだ演奏されていません。 AMQPへの私のアプローチとその使用法が正しいかどうかを知りたい – Sid

    関連する問題