私はRabbitMQの上にperlのNet::Stomp
を使用して、非常に単純なクロスプラットフォーム分散ワークキューをリグアップしています。アイデアは、複数のジョブを単一のキュー名にサブミットし、多数のワーカーに仕事を引き出して実行させることです。STOMPキューに接続するサブスクライバはラウンドロビンディストリビューションに追加されません
「ワーカー」を起動してフレーム/ジョブをキューに送信すると、私は予想どおりの結果を得ます。ワーカーはフレームをラウンドロビン方式でキューから取り除きます。
ジョブを永続キューにサブミットしてから、いくつかのワーカーを起動すると、最初のジョブだけがジョブをキューからプルします。これを実行する
# submit.pl
use Net::Stomp;
use JSON::PP;
my $stomp = Net::Stomp->new({ hostname => 'localhost', port => '61613' });
$stomp->connect({ login => 'a', passcode => 'a' }) or die;
for (my $i=0; $i < 20; $i++) {
my $package = encode_json({ seed => $i });
$stomp->send({ destination => '/queue/runs', body => $package });
}
$stomp->disconnect;
# worker.pl
my $stomp = Net::Stomp->new({ hostname => 'localhost', port => '61613' });
$stomp->connect({ login => 'a', passcode => 'a' });
$stomp->subscribe({
destination => '/queue/runs',
ack => 'client',
'activemq.prefetchSize' => 1 });
while (my $frame = $stomp->receive_frame) {
next unless defined $frame;
$stomp->ack({ frame => $frame });
next unless $frame->body;
my $spec = decode_json $frame->body;
say $$ . " " . $spec->{seed};
sleep 1;
}
:私は何を見ることを期待するようなものがある
$ perl submit.pl && perl worker.pl & perl worker.pl
:
30026 0
30024 1
30026 2
30024 3
...
は、代わりに私がキューからフレームを引っ張っのみ最初の "労働者" を参照してください。最初の「作業者」プロセスを終了すると、2番目のフレームがフレームを取得し始めます。
30024 0
30024 1
30024 2
30024 3
...
ワーカーがキューに登録するとすぐに、ラウンドロビン方式でフレームを引き出すことができます。私はむしろこれを処理する明白なものを書く必要はないでしょう。私は既にプロトコルでは、私が見落としているいくつかのメカニズム、またはおそらくNet::Stomp
のバグがあると仮定しますか?
私が言ったように、労働者がsubmit.pl
が実行される前に実行されていれば、ラウンドロビンディスパッチは完全に機能します。