akka-stream
Source
をwebsocketに接続しようとしています。このようSimple Web Socket Client
(SWSC)などのクライアントを使用してSource.actorRefはakka-streamでwebsocketにストリーミングしません
object TestWebServer {
val source1 = Source.actorRef[WsMessage](10, OverflowStrategy.dropHead)
.map { case [email protected](a,b,c,d,e,f) => println("Received from stream" + msg);TextMessage(c) }
import scala.concurrent.duration._
val source2 = Source.tick(initialDelay = 0 second, interval = 1 second, tick = TextMessage("tick"))
def main(args: Array[String]) {
implicit val system = ActorSystem("my-system")
implicit val materializer = ActorMaterializer()
// needed for the future flatMap/onComplete in the end
implicit val executionContext = system.dispatcher
val requestHandler: HttpRequest => HttpResponse = {
case [email protected](HttpMethods.GET, Uri.Path("/ws"), _, _, _) =>
req.header[UpgradeToWebSocket] match {
case Some(upgrade) => upgrade.handleMessagesWithSinkSource(Sink.ignore, source1)
case None => HttpResponse(400, entity = "Not a valid websocket request!")
}
case _: HttpRequest => HttpResponse(404, entity = "Unknown resource!")
}
val bindingFuture = Http().bindAndHandleSync(requestHandler, "localhost", 8080)
println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
StdIn.readLine() // let it run until user presses return
bindingFuture
.flatMap(_.unbind()) // trigger unbinding from the port
.onComplete(_ => system.terminate()) // and shutdown when done
}
}
、私はsource1
、 upgrade.handleMessagesWithSinkSource(Sink.ignore, source1)
でのWebSocketを接続した場合
- 私はSWSC
- バックは何も表示されないことがわかります
source2
とwebsocketを接続した場合、upgrade.handleMessagesWithSinkSource(Sink.ignore, source2)
メッセージtick
がSVSCコンソールに1秒おきに表示されるのを確認することができます
source1
にメッセージを送信すると、メッセージReceived from stream
が表示されます。だから私はsource1
が正しく設定されていると信じています。
どのようにすればいいですか?source1
はsource2
のように動作しますか? source1
を接続するために特別なものがありますか?
がを更新ありがとう:
- 私は、コードを更新しました。私は実際に
main
以外の2つのソースを宣言して、別のActor Systemからメッセージを送信することができます。 メッセージをsource1
に送信するための参照を共有する正しい方法ですか、それともactorSelection
や別のものを使用する必要がありますか? - ウラジミールMatveevがmentionnedとして、私が試してみました: は
source1.mapMaterializedValue { ref => ref ! WsMessage(..., "x", ...); ref ! WsMessage(..., "y", ...) }
が、私はまだSWSCで更新を確認することはできません。ここ
は私のテストクライアントのコードです:から
object Test {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("my-system2")
implicit val materializer = ActorMaterializer()
// needed for the future flatMap/onComplete in the end
implicit val executionContext = system.dispatcher
val source1Client = TestWebServer.source1
source1Client.mapMaterializedValue { ref => ref ! WsMessage(DateTime.now(), "x", "xx", 0, 0, 0); ref ! WsMessage(DateTime.now(), "y", "yy", 0, 0, 0) }
val source11Client = TestWebServer.source1
val actorRefClient = source11.to(Sink.ignore).run()
actorRef2 ! WsMessage(DateTime.now(), "x", "xx", 0, 0, 0)
}
}
source1
Test
はSource.actorRef
に達しません(source1
:TestWebServer
)actorRefClient
はコンソールReceived from streamWsMessage(...)
あなたのコードは正しく動作します。正しい俳優にメッセージを送信してもよろしいですか?私は 'mapMaterializedValue {ref => ref! WsMessage(...、 "x"、...); ref! WsMessage(...、 "y"、...)} 'と書いてあり、websocket接続の出力で' x'と 'y'を見ることができます。 –
ウラジミール、私の更新を見てください - ありがとう! – ccheneson