ここ数日、Akka StreamsとHTTPを使用してHTTPリソースをファイルにダウンロードする最良の方法を見つけようとしています。Akka StreamsとHTTPを使用してHTTPリソースをファイルにダウンロードする方法は?
は当初、私はFuture-Based Variantで開始し、それがこのようなものに見えた:
def downloadViaFutures(uri: Uri, file: File): Future[Long] = {
val request = Get(uri)
val responseFuture = Http().singleRequest(request)
responseFuture.flatMap { response =>
val source = response.entity.dataBytes
source.runWith(FileIO.toFile(file))
}
}
種類の大丈夫だったが、私は純粋アッカストリームに関するより多くのことを学んだ後、私は試してみて、ストリームを作成するためにFlow-Based Variantを使用していましたSource[HttpRequest]
から始まる。最初は、私がflatMapConcat
フローの変換に遭遇するまで、これは完全に私を困らせました。これは、もう少し詳細な終わった:
def responseOrFail[T](in: (Try[HttpResponse], T)): (HttpResponse, T) = in match {
case (responseTry, context) => (responseTry.get, context)
}
def responseToByteSource[T](in: (HttpResponse, T)): Source[ByteString, Any] = in match {
case (response, _) => response.entity.dataBytes
}
def downloadViaFlow(uri: Uri, file: File): Future[Long] = {
val request = Get(uri)
val source = Source.single((request,()))
val requestResponseFlow = Http().superPool[Unit]()
source.
via(requestResponseFlow).
map(responseOrFail).
flatMapConcat(responseToByteSource).
runWith(FileIO.toFile(file))
}
その後、私は少しトリッキー取得し、Content-Disposition
ヘッダーを使用していました。
def destinationFile(downloadDir: File, response: HttpResponse): File = {
val fileName = response.header[ContentDisposition].get.value
val file = new File(downloadDir, fileName)
file.createNewFile()
file
}
def downloadViaFutures2(uri: Uri, downloadDir: File): Future[Long] = {
val request = Get(uri)
val responseFuture = Http().singleRequest(request)
responseFuture.flatMap { response =>
val file = destinationFile(downloadDir, response)
val source = response.entity.dataBytes
source.runWith(FileIO.toFile(file))
}
}
をしかし、今、私は将来ベースのバリアントでこれを行う方法見当がつかない:
バック・トゥ・ザ・フューチャーベースのバリアントへ行きます。これは、これまで私が得たようである:オリジナルSource
は単一でなければならない理由がないので
def responseToByteSourceWithDest[T](in: (HttpResponse, T), downloadDir: File): Source[(ByteString, File), Any] = in match {
case (response, _) =>
val source = responseToByteSource(in)
val file = destinationFile(downloadDir, response)
source.map((_, file))
}
def downloadViaFlow2(uri: Uri, downloadDir: File): Future[Long] = {
val request = Get(uri)
val source = Source.single((request,()))
val requestResponseFlow = Http().superPool[Unit]()
val sourceWithDest: Source[(ByteString, File), Unit] = source.
via(requestResponseFlow).
map(responseOrFail).
flatMapConcat(responseToByteSourceWithDest(_, downloadDir))
sourceWithDest.runWith(???)
}
は、だから今、私は(私は、各File
を各File
ための一つ以上の(ByteString, File)
要素を放出するSource
言っていますHttpRequest
)。
これらを受け取り、動的なSink
にルーティングする方法はありますか?だから私はとdownloadViaFlow2
を完了できることを
def runWithMap[T, Mat2](f: T => Graph[SinkShape[Out], Mat2])(implicit materializer: Materializer): Mat2 = ???
:
私は、次のような、flatMapConcat
のようなものを考えている
def destToSink(destination: File): Sink[(ByteString, File), Future[Long]] = {
val sink = FileIO.toFile(destination, true)
Flow[(ByteString, File)].map(_._1).toMat(sink)(Keep.right)
}
sourceWithDest.runWithMap {
case (_, file) => destToSink(file)
}
私はこれよりも良い方法があることを望んでいました。私は、これが実際に正しく動作するかどうかも確信していません。 'writeFile'は、FileIOストリームがマテリアライズされるとすぐに戻ります。レスポンスがチャンクされている場合は、順番にファイルに書き込む必要があります。'mapAsync'の使用に似た問題です。 'append'パラメータも設定する必要があります。また、ファイルへの書き込みエラーがあっても、外部ストリームがエラー信号を受け取ることはないようです。 – Steiny
@Steinyあなたの複数のコメントへの私の答えの打ち切り:(a)正しい、書き込みファイルの戻り値はすぐに返ってきますが、mapAsyncはこれを処理します(b)chunkedsourceを修正できる解決策もなく、元の質問/ (c)追加は、同じファイルに書き込む場合(d)、ファイルの書き込みに失敗した場合に外部ストリームを強制的に強制的に強制終了することは元の質問の一部ではない。あなたは「これを取って動的なシンクに向けるのはとにかくありますか?」と尋ねました。私の回答は**その質問に答えます。あなたのサンプルコードの文脈で私の返答を書いた... –