2016-10-20 14 views
0

AkkaのConsistentHashingRoutingLogicを使用して、同じキーを持つメッセージが同じアクタにルーティングされることを保証しようとしています。同じキーを持つメッセージがFIFOの順序で処理されることが重要です。異なるキーを持つメッセージは、異なるアクターにルーティングされ、自由に並行して処理されます。私はAkkaを分散モードで使用していません。Akka ConsistentHashingRoutingLogicが同じディスパッチャスレッドに一貫してルーティングしない

メッセージは実際にはRabbitMQブローカから読み取られるJSONメッセージであるため、マスターのアクタはAMQPメッセージを受信し、ルーティングキーをメッセージキーとして使用します。同じキーはメッセージ自体にもあります。アクターはSpringアプリケーションの一部です。

私のマスターの俳優は次のようになります。

@Named("MessageHandlerMaster") 
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) 
public class MessageHandlerMaster extends UntypedActor { 

    private static final Logger log = LoggerFactory.getLogger(MessageHandlerMaster.class); 

    private Router router; 

    @Autowired 
    public MessageHandlerMaster(final SpringProps springProps) { 

    List<Routee> routees = Stream.generate(() -> { 
     ActorRef worker = getContext().actorOf(springProps.create(MessageHandlerWorker.class)); 
     getContext().watch(worker); 
     return new ActorRefRoutee(worker); 
    }).limit(5) //todo: configurable number of workers 
     .collect(Collectors.toList()); 

    router = new Router(new ConsistentHashingRoutingLogic(getContext().system()), routees); 
    } 

    public void onReceive(Object message) { 
    if (message instanceof Message) { 
     Message amqpMessage = (Message) message; 
     String encoding = getMessageEncoding(amqpMessage); 
     try { 
     String json = new String(amqpMessage.getBody(), encoding); 
     String routingKey = amqpMessage.getMessageProperties().getReceivedRoutingKey(); 
     log.debug("Routing message based on routing key " + routingKey); 
     router.route(new ConsistentHashingRouter.ConsistentHashableEnvelope(json, routingKey), getSender()); 
     } catch (UnsupportedEncodingException e) { 
     log.warn("Unknown content encoding sent in message! {}", encoding); 
     } 
    } else if (message instanceof Terminated) { 
     //if one of the routee's died, remove it and replace it 
     log.debug("Actor routee terminated!"); 
     router.removeRoutee(((Terminated) message).actor()); 
     ActorRef r = getContext().actorOf(Props.create(MessageHandlerWorker.class)); 
     getContext().watch(r); 
     router = router.addRoutee(new ActorRefRoutee(r)); 
    } 
    } 

    private static String getMessageEncoding(Message message) { 
    String encoding = message.getMessageProperties().getContentEncoding(); 
    if ((encoding == null) || (encoding.equals(""))) { 
     encoding = "UTF-8"; 
    } 
    return encoding; 
    } 
} 

私が最初に一度マスターを取得しています:

this.master = actorSystem.actorOf(springProps.create(MessageHandlerMaster.class), "master"); 

、その後だけで、それにメッセージを提出する:

master.tell(message, ActorRef.noSender()); 

しかし、ワーカーのonReceive()からログを印刷すると、別のディスパッチャスレッドが時には同じキーに使用されます。

マスター・アクターとワーカー・アクターに同じディスパッチャ・スレッドが使用されることがあるのはなぜか分かりません。スレッド間で非同期のメッセージをやり取りするべきではありませんか?

16:45:13.359 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186 
16:45:13.359 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186 
16:45:13.360 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186 
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186 

あなたがここに見ることができるように、キー10420186を持つ労働者の処理メッセージのディスパッチャスレッドは時々9時には10マスター俳優は時々も、これらの2つのスレッドを使用しました。

ConsistentHashingRoutingLogicが実際に動作していて、同じスレッドが同じキーでメッセージを処理する方法を確認するにはどうすればよいですか?ルータの初期設定で何か問題がありますか?

+1

私は俳優がスレッドにバインドされていないと思う。添付されたログに間違いはありません。 – vrudkovsk

答えて

0

@vrudkovskは彼のコメントで正しいです。あなたはスレッドとアクターの間で混乱していると思います。アクターは、アドレスとメールボックスを持つメモリ内の単なるオブジェクトです。ディスパッチャは、基本的に、アクタとアクションを実行するスレッドプールです。例のアクションは次のとおりです。俳優

  • でそれを処理するために、メールボックスからメッセージをデキュー

    • は、メールボックスにメッセージをエンキュー。

    異なるスレッドは、同じアクタに対してアクションを実行できます。それはディスパッチャーによって決定されます。 Akkaは、一度に1つのスレッドだけがアクター内のメッセージを処理することを保証します。それが常に同じスレッドになるというわけではありません。

    同じアクターに確実にアクセスしたい場合は、context.self.pathまたはcontext.self.path.addressを使用してアクターパスまたはアドレスを記録することをお勧めします。これらは同じActorSystem内の一意のIDであるためです。

  • +0

    あなたの答えをありがとう。したがって、このケースでは、「ConsistentHashingRoutingLogic」を使用しているため、アクター1にどのスレッドが使用されているかに関係なく、5人のWorkerアクターがいる場合、2番目のディスパッチャースレッドがピックアップすることはありません前のメッセージが処理される前にアクタ1にルーティングされるべき別のメッセージ? – jbx

    +0

    これは正しいです。 'ConsistentHashingRoutingLogic'は、どのスレッドが作業をしているかにかかわらず、あなたのメッセージが正しいアクターになるようにします。 – hveiga

    関連する問題