AkkaのConsistentHashingRoutingLogic
を使用して、同じキーを持つメッセージが同じアクタにルーティングされることを保証しようとしています。同じキーを持つメッセージがFIFOの順序で処理されることが重要です。異なるキーを持つメッセージは、異なるアクターにルーティングされ、自由に並行して処理されます。私はAkkaを分散モードで使用していません。Akka ConsistentHashingRoutingLogicが同じディスパッチャスレッドに一貫してルーティングしない
メッセージは実際にはRabbitMQブローカから読み取られるJSONメッセージであるため、マスターのアクタはAMQPメッセージを受信し、ルーティングキーをメッセージキーとして使用します。同じキーはメッセージ自体にもあります。アクターはSpringアプリケーションの一部です。
私のマスターの俳優は次のようになります。
@Named("MessageHandlerMaster")
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class MessageHandlerMaster extends UntypedActor {
private static final Logger log = LoggerFactory.getLogger(MessageHandlerMaster.class);
private Router router;
@Autowired
public MessageHandlerMaster(final SpringProps springProps) {
List<Routee> routees = Stream.generate(() -> {
ActorRef worker = getContext().actorOf(springProps.create(MessageHandlerWorker.class));
getContext().watch(worker);
return new ActorRefRoutee(worker);
}).limit(5) //todo: configurable number of workers
.collect(Collectors.toList());
router = new Router(new ConsistentHashingRoutingLogic(getContext().system()), routees);
}
public void onReceive(Object message) {
if (message instanceof Message) {
Message amqpMessage = (Message) message;
String encoding = getMessageEncoding(amqpMessage);
try {
String json = new String(amqpMessage.getBody(), encoding);
String routingKey = amqpMessage.getMessageProperties().getReceivedRoutingKey();
log.debug("Routing message based on routing key " + routingKey);
router.route(new ConsistentHashingRouter.ConsistentHashableEnvelope(json, routingKey), getSender());
} catch (UnsupportedEncodingException e) {
log.warn("Unknown content encoding sent in message! {}", encoding);
}
} else if (message instanceof Terminated) {
//if one of the routee's died, remove it and replace it
log.debug("Actor routee terminated!");
router.removeRoutee(((Terminated) message).actor());
ActorRef r = getContext().actorOf(Props.create(MessageHandlerWorker.class));
getContext().watch(r);
router = router.addRoutee(new ActorRefRoutee(r));
}
}
private static String getMessageEncoding(Message message) {
String encoding = message.getMessageProperties().getContentEncoding();
if ((encoding == null) || (encoding.equals(""))) {
encoding = "UTF-8";
}
return encoding;
}
}
私が最初に一度マスターを取得しています:
this.master = actorSystem.actorOf(springProps.create(MessageHandlerMaster.class), "master");
、その後だけで、それにメッセージを提出する:
master.tell(message, ActorRef.noSender());
しかし、ワーカーのonReceive()
からログを印刷すると、別のディスパッチャスレッドが時には同じキーに使用されます。
マスター・アクターとワーカー・アクターに同じディスパッチャ・スレッドが使用されることがあるのはなぜか分かりません。スレッド間で非同期のメッセージをやり取りするべきではありませんか?
16:45:13.359 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.359 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.360 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
あなたがここに見ることができるように、キー10420186を持つ労働者の処理メッセージのディスパッチャスレッドは時々9時には10マスター俳優は時々も、これらの2つのスレッドを使用しました。
ConsistentHashingRoutingLogic
が実際に動作していて、同じスレッドが同じキーでメッセージを処理する方法を確認するにはどうすればよいですか?ルータの初期設定で何か問題がありますか?
私は俳優がスレッドにバインドされていないと思う。添付されたログに間違いはありません。 – vrudkovsk