0

? 私は接続を確立しており、200のステータスコードを受け取っていますが、私はつぶやきを読む方法を知らない。彼らが来るときに私はちょうどprintlnつぶやきをしたいです。PlayframeworkやTwitterのストリーミングAPI

ws.url(url) 
.sign(OAuthCalculator(consumerKey, requestToken)) 
.withMethod("POST") 
.stream() 
.map { response => 
    if(response.headers.status == 200) 
    println(response.body) 
} 

編集:私はあなたにFuture[StreamedResponse]をバック与えstream()を呼び出す

ws.url(url) 
.sign(OAuthCalculator(consumerKey, requestToken)) 
.withMethod("POST") 
.stream() 
.map { response => 
    if(response.headers.status == 200){ 
    response.body 
     .scan("")((acc, curr) => if (acc.contains("\r\n")) curr.utf8String else acc + curr.utf8String) 
     .filter(_.contains("\r\n")) 
     .map(json => Try(parse(json).extract[Tweet])) 
     .runForeach { 
     case Success(tweet) => 
      println("-----") 
      println(tweet.text) 
     case Failure(e) => 
      println("-----") 
      println(e.getStackTrace) 
     } 
    } 
} 

答えて

4

ストリーミングWS要求に対する応答の本体は、バイトのアッカストリームSourceあります。 TwitterのAPIの応答は改行(通常は)で区切られているので、Framing.delimiterを使用してバイトチャンクに分割し、チャンクをJSONに解析して、必要な処理を行うことができます。このような何か作業をする必要があります:

import akka.stream.scaladsl.Framing 
import scala.util.{Success, Try} 
import akka.util.ByteString 
import play.api.libs.json.{JsSuccess, Json, Reads} 
import play.api.libs.oauth.{ConsumerKey, OAuthCalculator, RequestToken} 

case class Tweet(id: Long, text: String) 
object Tweet { 
    implicit val reads: Reads[Tweet] = Json.reads[Tweet] 
} 

def twitter = Action.async { implicit request => 
    ws.url("https://stream.twitter.com/1.1/statuses/filter.json?track=Rio2016") 
     .sign(OAuthCalculator(consumerKey, requestToken)) 
     .withMethod("POST") 
     .stream().flatMap { response => 
    response.body 
     // Split up the byte stream into delimited chunks. Note 
     // that the chunks are quite big 
     .via(Framing.delimiter(ByteString.fromString("\n"), 20000)) 
     // Parse the chunks into JSON, and then to a Tweet. 
     // A better parsing strategy would be to account for all 
     // the different possible responses, but here we just 
     // collect those that match a Tweet. 
     .map(bytes => Try(Json.parse(bytes.toArray).validate[Tweet])) 
     .collect { 
     case Success(JsSuccess(tweet, _)) => tweet.text 
     } 
     // Print out each chunk 
     .runForeach(println).map { _ => 
     Ok("done") 
    } 
    } 
} 

注意:あなたがあなたのコントローラに暗黙のMaterializerを注入する必要がありますストリームを実体化します。

+0

説明 – mkovacek

+0

ためのおかげでそれが可能に後で近い接続ですか?私は別の言葉を追跡するための複数の要求を持っていることを計画しています。 – mkovacek

+1

Akkaドキュメントの[Dynamic Stream Handling](http://doc.akka.io/docs/akka/2.4.8/scala/stream/stream-dynamic.html)をご覧ください。一つのアイデア:共有されたkillスイッチを作成し、それを 'source.via(killSwitch.flow)'でストリームに追加する。 killswitchで 'shutdown()'を実行すると、接続が切断されます。 – Mikesname

3

この解決策を見つけました。その中でByteStringチャンクを変換するためにakkaイディオムを使用する必要があります。何かのように:私は上記のコードをテストしていない(しかし、それはhttps://www.playframework.com/documentation/2.5.x/ScalaWSのストリーミング応答部プラスhttp://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-flows-and-basics.htmlからシンク記述のオフに基づいています)また、これは、それぞれが印刷されますのでご注意

val stream = ws.url(url) 
    .sign(OAuthCalculator(consumerKey, requestToken)) 
    .withMethod("POST") 
    .stream() 

stream flatMap { res => 
    res.body.runWith(Sink.foreach[ByteString] { bytes => 
    println(bytes.utf8String) 
    }) 
} 

ノートチャンクごとに完全なjson blobを返すかどうかは分かりません。チャンクを印刷する前に塊を累積したい場合は、Sink.foldを使用する必要があります。

関連する問題