2016-06-26 1 views
2

akka-streamSourceをwebsocketに接続しようとしています。このようSimple Web Socket Client(SWSC)などのクライアントを使用してSource.actorRefはakka-streamでwebsocketにストリーミングしません

object TestWebServer { 

    val source1 = Source.actorRef[WsMessage](10, OverflowStrategy.dropHead) 
    .map { case [email protected](a,b,c,d,e,f) => println("Received from stream" + msg);TextMessage(c) } 

    import scala.concurrent.duration._ 
    val source2 = Source.tick(initialDelay = 0 second, interval = 1 second, tick = TextMessage("tick")) 



    def main(args: Array[String]) { 

    implicit val system = ActorSystem("my-system") 
    implicit val materializer = ActorMaterializer() 
    // needed for the future flatMap/onComplete in the end 
    implicit val executionContext = system.dispatcher 


    val requestHandler: HttpRequest => HttpResponse = { 
     case [email protected](HttpMethods.GET, Uri.Path("/ws"), _, _, _) => 
     req.header[UpgradeToWebSocket] match { 
      case Some(upgrade) => upgrade.handleMessagesWithSinkSource(Sink.ignore, source1) 
      case None => HttpResponse(400, entity = "Not a valid websocket request!") 
     } 
     case _: HttpRequest => HttpResponse(404, entity = "Unknown resource!") 
    } 


    val bindingFuture = Http().bindAndHandleSync(requestHandler, "localhost", 8080) 

    println(s"Server online at http://localhost:8080/\nPress RETURN to stop...") 
    StdIn.readLine() // let it run until user presses return 
    bindingFuture 
     .flatMap(_.unbind()) // trigger unbinding from the port 
     .onComplete(_ => system.terminate()) // and shutdown when done 
    } 
} 

、私はsource1upgrade.handleMessagesWithSinkSource(Sink.ignore, source1) でのWebSocketを接続した場合

  • 私はSWSC
  • バックは何も表示されないことがわかりますsource2とwebsocketを接続した場合、 upgrade.handleMessagesWithSinkSource(Sink.ignore, source2)メッセージtickがSVSCコンソールに1秒おきに表示されるのを確認することができます

source1にメッセージを送信すると、メッセージReceived from streamが表示されます。だから私はsource1が正しく設定されていると信じています。

どのようにすればいいですか?source1source2のように動作しますか? source1を接続するために特別なものがありますか?

を更新ありがとう:

  • 私は、コードを更新しました。私は実際にmain以外の2つのソースを宣言して、別のActor Systemからメッセージを送信することができます。 メッセージをsource1に送信するための参照を共有する正しい方法ですか、それともactorSelectionや別のものを使用する必要がありますか?
  • ウラジミールMatveevがmentionnedとして、私が試してみました: はsource1.mapMaterializedValue { ref => ref ! WsMessage(..., "x", ...); ref ! WsMessage(..., "y", ...) }が、私はまだSWSCで更新を確認することはできません。ここ

は私のテストクライアントのコードです:から

object Test { 

    def main(args: Array[String]): Unit = { 

    implicit val system = ActorSystem("my-system2") 
    implicit val materializer = ActorMaterializer() 
    // needed for the future flatMap/onComplete in the end 
    implicit val executionContext = system.dispatcher 

    val source1Client = TestWebServer.source1 
    source1Client.mapMaterializedValue { ref => ref ! WsMessage(DateTime.now(), "x", "xx", 0, 0, 0); ref ! WsMessage(DateTime.now(), "y", "yy", 0, 0, 0) } 

    val source11Client = TestWebServer.source1 
    val actorRefClient = source11.to(Sink.ignore).run() 
    actorRef2 ! WsMessage(DateTime.now(), "x", "xx", 0, 0, 0) 

    } 
} 
  • source1TestSource.actorRefに達しません(source1TestWebServer
  • actorRefClientはコンソールReceived from streamWsMessage(...)
+0

あなたのコードは正しく動作します。正しい俳優にメッセージを送信してもよろしいですか?私は 'mapMaterializedValue {ref => ref! WsMessage(...、 "x"、...); ref! WsMessage(...、 "y"、...)} 'と書いてあり、websocket接続の出力で' x'と 'y'を見ることができます。 –

+0

ウラジミール、私の更新を見てください - ありがとう! – ccheneson

答えて

1

ごめんなさいでプリントアウトTestWebServersource1に達していますが、あなたの更新はあまり役に立ちません。ここでは私のため正常に動作プログラム例はあり、両方とも私が使用している場合アッカ-HTTP WebSocketをクライアントと私はwstaようないくつかの外部ツールを使用する場合:

import java.time.Instant 
import scala.io.StdIn 

import akka.actor.ActorSystem 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model._ 
import akka.http.scaladsl.model.ws.{TextMessage, UpgradeToWebSocket, WebSocketRequest} 
import akka.stream.{ActorMaterializer, OverflowStrategy} 
import akka.stream.scaladsl.{Flow, Sink, Source} 

case class WsMessage(a: Instant, b: String, c: String, d: Int, e: Int, f: Int) 

object MainServer extends App { 
    implicit val actorSystem = ActorSystem() 
    implicit val materializer = ActorMaterializer() 
    implicit val executionContext = actorSystem.dispatcher 

    val source = Source.actorRef[WsMessage](10, OverflowStrategy.dropHead) 
    .map { 
     case [email protected](_, _, c, _, _, _) => 
     println(s"Received from stream: $msg") 
     TextMessage(c) 
    } 
    .mapMaterializedValue { ref => 
     ref ! WsMessage(Instant.now(), "a", "x", 0, 0, 0) 
     ref ! WsMessage(Instant.now(), "b", "y", 0, 0, 0) 
    } 

    val requestHandler: HttpRequest => HttpResponse = { 
    case [email protected](HttpMethods.GET, Uri.Path("/ws"), _, _, _) => 
     req.header[UpgradeToWebSocket] match { 
     case Some(upgrade) => upgrade.handleMessagesWithSinkSource(Sink.ignore, source) 
     case None => HttpResponse(StatusCodes.BadRequest, entity = "Not a valid websocket request!") 
     } 
    case _ => 
     HttpResponse(StatusCodes.NotFound, entity = "Unknown resource!") 
    } 

    val bindingFuture = Http().bindAndHandleSync(requestHandler, "localhost", 8080) 

    println(s"Server online at http://localhost:8080/\nPress RETURN to stop...") 
    StdIn.readLine() 
    bindingFuture 
    .flatMap(_.unbind()) 
    .onComplete(_ => actorSystem.terminate()) 
} 

object MainClient extends App { 
    implicit val actorSystem = ActorSystem() 
    implicit val materializer = ActorMaterializer() 
    implicit val executionContext = actorSystem.dispatcher 

    Http() 
    .singleWebSocketRequest(WebSocketRequest(Uri("ws://localhost:8080/ws")), Flow.fromSinkAndSource(Sink.foreach(println), Source.empty)) 
    Thread.sleep(5000) 
    actorSystem.terminate() 
} 

を私は、このプログラムとあなたとの間の重要な違いが表示されません。このプログラムにはサーバーとクライアントの両方が含まれているため、サーバーを起動してからクライアントを複数回起動することができます。例えば、2つのクライアントを実行した後、ここに私のサーバーの出力は次のとおりです。

Server online at http://localhost:8080/ 
Press RETURN to stop... 
Received from stream: WsMessage(2016-06-28T08:58:21.478Z,a,x,0,0,0) 
Received from stream: WsMessage(2016-06-28T08:58:21.478Z,b,y,0,0,0) 
Received from stream: WsMessage(2016-06-28T08:58:29.925Z,a,x,0,0,0) 
Received from stream: WsMessage(2016-06-28T08:58:29.925Z,b,y,0,0,0) 

そしてここでは、クライアントの出力のいずれかです。私はwsta ws://localhost:8080/wsを実行したときに

TextMessage.Strict(x) 
TextMessage.Strict(y) 

私は同じようなことを参照してください。あなたがSource秒、Sink秒またはFlow Sを宣言どこ

また、それは問題ではありません:彼らはrun()ている場合にのみ、「行動する」ことができます不変の青写真です。

+0

私はあなたのコードを使用すると、今すぐプリントされたメッセージを見ることができます。私のコードとあなたのコードの間に見られる唯一の違いは、あなたのソースがその定義内からメッセージを送信していることです。私のコードでは 'ソース'を取得し、それにメッセージを送ります(私は基本的に外部の俳優に 'ソース'にメッセージを送ることを望みます)。 1つのJVMでWebサーバーを実行して問題が発生する可能性があり、別のJVM(別のプログラム)でテストクライアントを起動します。それは可能性が? – ccheneson

+0

@ccheneson、あなたのテストクライアントは何ですか?上記の例では、別のアプリケーションである 'MainClient'オブジェクトがあります。もちろん、 'MainServer'と一緒に動作することをテストしたとき、別々のJVMで起動されました。 –

+0

@ccheneson、また、ソースアクタをどのように「取得」していますか?私は 'mapMaterializedValue'を使わずにそれをやって、それを' var'のようなもののどこかに格納する方法がありません。 –

関連する問題