2017-12-18 8 views
1

私はalpakkaを使用して複数のjmsSource(異なるキュー用)を開始するシナリオを持っています。私はまた、キューをいつでも切り離す必要があります。そこで、以下のようにjms akkaストリームにKillSwitchを追加しました: -Akkaはalpakka jmsのKillSwitchをストリームします

trait MessageListener { 

    lazy val jmsPipeline = jmsSource 
    .map { x => log.info(s"Received message ${x} from ${queue}"); x } 
    .viaMat(KillSwitches.single)(Keep.right) 
    .toMat(Sink.foreach { x => pipelineActorRef ! PreProcessorMessage(x) }) 
    (Keep.both) 
    .run() 

    def start(): Unit = { 
      log.info("Invoking listener : {}", queue) 
      jmsPipeline 
      log.info("listener : {} started", queue) 
      } 
    def stop():Unit =  jmsPipeline._1.shutdown() 

    def queue: String 

} 

object ListenerA extends MessageListener { 
    override def queue: String = "Queue_A" 
} 

object ListenerB extends MessageListener { 
    override def queue: String = "Queue_B" 
} 

などと同様です。

アプリケーションを起動すると、すべてのキューが接続され正常に動作します。しかし、私がstopメソッドを使ってキューをデタッチしようとすると、すべてのキューが切断されず、動作がランダムになるわけではありません。また、killSwitchがすべてのリスナーで異なることも確認しました。

ここで何が間違っているのか教えてください。

答えて

0

ログでは、異なるストリームの複数のキューに接続しているように見えますが、複数のストリームが同じキューに接続されている可能性があります。両方のリスナーオブジェクトで、ログにはオーバーライドされたqueueという名前が記録されますが、このキュー名はjmsSourceの構成には使用されません。

jmsSourceの定義は表示されません。明らかにそれはMessageListener形質の外側のどこかで定義されており、その場合にはListenerAListenerBの両方が同じものを使用していますjmsSource。換言すれば、ListenerAListenerBながらjmsPipelineの別個のインスタンスを(キルスイッチが異なっている理由である)、これらjmsPipelineインスタンスの両方をjmsSourceを呼び出すたびに異なるSourceを作成defでない限り、(同じjmsSourceインスタンスから誘導される有しますこれが当てはまる場合でも基本的な問題は残ります:queueは設定で使用されていません)。 Alpakkaで

は、JMSキューがそうjmsSourceはおそらく次のようになり、JmsSourceSettings上に構成されています。

val jmsSource: Source[String, NotUsed] = JmsSource.textSource(
    JmsSourceSettings(connectionFactory).withBufferSize(10).withQueue("MyQueue") 
)      // the queue is configured here^

ときListenerA.start()は、例えば、次のように記録され、呼び出されます。

Invoking listener : Queue_A 
listener : Queue_A started 

また、上記のログステートメントの"Queue_A"は、オーバーライドされたdef queue: Stringメンバの値がListenerAです。実際にはjmsSource(上記の例では"MyQueue")に設定されているキューとは限りません。 ListenerBと同じこと、およびmapコンビネータにログインしているというメッセージが表示されます。

簡単な修正はMessageListenerトレイト内jmsSourceとそのJmsSourceSettingsの定義を移動し、実際にそれらの設定にqueueを使用することです。

関連する問題