2017-10-23 1 views
2

2つの別々のAPIへのHTTPコールを行うプロジェクトがあります。これらの両方のAPIへの呼び出しは、速度を別々に制限する必要があります。私はAPIのいずれかの呼び出しを開始し、これを実現するためにカスタムのExecutionContextを使用しようとしています。ここに私のapplication.confです:特定のExecutionContextでScala WSコールを実行しようとしています

play.modules.enabled += "playtest.PlayTestModule" 

my-context { 
    fork-join-executor { 
    parallelism-min = 10 
    parallelism-max = 10 
    } 
} 

これは、私はそれが動作するかどうかをテストするために使用していScalaのクラスです:

@Singleton 
class MyWsClient @Inject() (client: WSClient, akkaSystem: ActorSystem) { 

    val myExecutionContext: ExecutionContext = akkaSystem.dispatchers.lookup("my-context") 
    val i = new AtomicInteger(0) 

    def doThing: Future[Int] = { 
     Future { 
     println(i.incrementAndGet) 
     println("Awaiting") 
     Await.result(client.url("http://localhost:9000/test").get, Duration.Inf) 
     println("Done") 
     i.decrementAndGet 
     1 
     }(myExecutionContext) 
    } 
} 

しかし、関係なく、私がしようとするもの、並列コール数は超えません私がapplication.confに設定した制限値。私は

Thread.sleep(1000) 

限界が尊重され、レートが適切に制限されているとライン

Await.result(client.url("http://localhost:9000/test").get, Duration.Inf) 

を交換する場合ので、しかし、それは、見知らぬ人にもなります。

私は間違っていますが、どうすれば修正できますか? scala-wsライブラリでレート制限の別の方法がある場合、私はそれを聞いてみたいと思います。

+1

**並列**コールの数が制限を超えていますか? –

+0

@SergeyKovalevはい正しいです。質問内のテキストを変更しました –

+1

ノンブロッキングのことです:要求ごとにスレッドは必要ありません。しかし制限非同期HTTPクライアントの支持率のようなライブラリ:https://github.com/AsyncHttpClient/async-http-client/blob/3e78a04d58ab904fe668d0cf4c09b31ba7437500/extras/guava/src/main/java/org/asynchttpclient/extras/guava /RateLimitedThrottleRequestFilter.java – rethab

答えて

2

私はscala-wsを引き続き使用したいと思いますが、特定のExecutionContextを使用することに頼らないものはどうですか?

これに同意すれば、ここにアイデアがあります... RateLimitedWSClientコンポーネントを作成します。これはWSClientではなくコントローラに注入されます。このコンポーネントはシングルトンで、単一のメソッドdef rateLimit[R](rateLimitClass: String)(request: WSClient => Future[R])をサポートする必要があります。 rateLimitClassは、現在のrequestに適用するratelimitを指定することを目的としています。要求を異なるAPIに異なる割合で制限する必要があると述べたためです。 requestの機能は明らかです。

今の実装のための私の提案は、実際のWSClientを通じてパイプあなたのrequest sがthrottleフローステージ(https://doc.akka.io/docs/akka/current/scala/stream/stages-overview.html#throttle)を使用して、速度制限する一方、単純なアッカ・ストリームを使用することです:

val client: WSClient = ??? // injected into the component 
// component initialization, for example create one flow per API 
val queue = 
    Source 
    .queue[(Promise[_], WSClient => Future[_])](...) // keep this materialized value 
    .throttle(...) 
    .map { (promise, request) => 
    promise.completeWith(request(client)) 
    } 
    .to(Sink.ignore) 
    .run() // You have to get the materialized queue out of here! 

def rateLimit[R](rateLimitClass: String)(request: WSClient => Future[R]): Future[R] = { 
    val result = Promise.empty[R] 
    // select which queue to use based on rateLimitClass 
    if (rateLimitClass == "API1") 
    queue.offer(result -> request) 
    else ??? 
    result.future 
} 

上のコードは大まかで、私はあなたがそのアイデアを得ることを願っています。もちろん、待ち行列を保持する場合は、オーバーフローを処理する方法を決定する必要があります。

関連する問題