2017-01-05 1 views
0

私はRabbitMQの上にperlのNet::Stompを使用して、非常に単純なクロスプラットフォーム分散ワークキューをリグアップしています。アイデアは、複数のジョブを単一のキュー名にサブミットし、多数のワーカーに仕事を引き出して実行させることです。STOMPキューに接続するサブスクライバはラウンドロビンディストリビューションに追加されません

「ワーカー」を起動してフレーム/ジョブをキューに送信すると、私は予想どおりの結果を得ます。ワーカーはフレームをラウンドロビン方式でキューから取り除きます。

ジョブを永続キューにサブミットしてから、いくつかのワーカーを起動すると、最初のジョブだけがジョブをキューからプルします。これを実行する

# submit.pl 
use Net::Stomp; 
use JSON::PP; 

my $stomp = Net::Stomp->new({ hostname => 'localhost', port => '61613' }); 

$stomp->connect({ login => 'a', passcode => 'a' }) or die; 

for (my $i=0; $i < 20; $i++) { 
    my $package = encode_json({ seed => $i }); 
    $stomp->send({ destination => '/queue/runs', body => $package }); 
} 

$stomp->disconnect; 

# worker.pl 
my $stomp = Net::Stomp->new({ hostname => 'localhost', port => '61613' }); 
$stomp->connect({ login => 'a', passcode => 'a' }); 
$stomp->subscribe({ 
     destination    => '/queue/runs', 
     ack      => 'client', 
     'activemq.prefetchSize' => 1 }); 


while (my $frame = $stomp->receive_frame) { 
    next unless defined $frame; 
    $stomp->ack({ frame => $frame }); 
    next unless $frame->body; 
    my $spec = decode_json $frame->body; 
    say $$ . " " . $spec->{seed}; 
    sleep 1; 
} 

:私は何を見ることを期待するようなものがある

$ perl submit.pl && perl worker.pl & perl worker.pl 

30026 0 
30024 1 
30026 2 
30024 3 
... 

は、代わりに私がキューからフレームを引っ張っのみ最初の "労働者" を参照してください。最初の「作業者」プロセスを終了すると、2番目のフレームがフレームを取得し始めます。

30024 0 
30024 1 
30024 2 
30024 3 
... 

ワーカーがキューに登録するとすぐに、ラウンドロビン方式でフレームを引き出すことができます。私はむしろこれを処理する明白なものを書く必要はないでしょう。私は既にプロトコルでは、私が見落としているいくつかのメカニズム、またはおそらくNet::Stompのバグがあると仮定しますか?

私が言ったように、労働者がsubmit.plが実行される前に実行されていれば、ラウンドロビンディスパッチは完全に機能します。

答えて

0

RabbitmqのSTOMPプラグインのドキュメントに表示されているactivemq.prefetchSizeのキーは何もしません。私がしなければならなかったのは、それを'prefetch-count' => 1に変更して、それがすべて私が期待したとおりに働くことでした。

関連する問題