2016-01-20 20 views
6

ここ数日、Akka StreamsとHTTPを使用してHTTPリソースをファイルにダウンロードする最良の方法を見つけようとしています。Akka StreamsとHTTPを使用してHTTPリソースをファイルにダウンロードする方法は?

は当初、私はFuture-Based Variantで開始し、それがこのようなものに見えた:

def downloadViaFutures(uri: Uri, file: File): Future[Long] = { 
    val request = Get(uri) 
    val responseFuture = Http().singleRequest(request) 
    responseFuture.flatMap { response => 
    val source = response.entity.dataBytes 
    source.runWith(FileIO.toFile(file)) 
    } 
} 

種類の大丈夫だったが、私は純粋アッカストリームに関するより多くのことを学んだ後、私は試してみて、ストリームを作成するためにFlow-Based Variantを使用していましたSource[HttpRequest]から始まる。最初は、私がflatMapConcatフローの変換に遭遇するまで、これは完全に私を困らせました。これは、もう少し詳細な終わった:

def responseOrFail[T](in: (Try[HttpResponse], T)): (HttpResponse, T) = in match { 
    case (responseTry, context) => (responseTry.get, context) 
} 

def responseToByteSource[T](in: (HttpResponse, T)): Source[ByteString, Any] = in match { 
    case (response, _) => response.entity.dataBytes 
} 

def downloadViaFlow(uri: Uri, file: File): Future[Long] = { 
    val request = Get(uri) 
    val source = Source.single((request,())) 
    val requestResponseFlow = Http().superPool[Unit]() 
    source. 
    via(requestResponseFlow). 
    map(responseOrFail). 
    flatMapConcat(responseToByteSource). 
    runWith(FileIO.toFile(file)) 
} 

その後、私は少しトリッキー取得し、Content-Dispositionヘッダーを使用していました。

def destinationFile(downloadDir: File, response: HttpResponse): File = { 
    val fileName = response.header[ContentDisposition].get.value 
    val file = new File(downloadDir, fileName) 
    file.createNewFile() 
    file 
} 

def downloadViaFutures2(uri: Uri, downloadDir: File): Future[Long] = { 
    val request = Get(uri) 
    val responseFuture = Http().singleRequest(request) 
    responseFuture.flatMap { response => 
    val file = destinationFile(downloadDir, response) 
    val source = response.entity.dataBytes 
    source.runWith(FileIO.toFile(file)) 
    } 
} 

をしかし、今、私は将来ベースのバリアントでこれを行う方法見当がつかない:

バック・トゥ・ザ・フューチャーベースのバリアントへ行きます。これは、これまで私が得たようである:オリジナルSourceは単一でなければならない理由がないので

def responseToByteSourceWithDest[T](in: (HttpResponse, T), downloadDir: File): Source[(ByteString, File), Any] = in match { 
    case (response, _) => 
    val source = responseToByteSource(in) 
    val file = destinationFile(downloadDir, response) 
    source.map((_, file)) 
} 

def downloadViaFlow2(uri: Uri, downloadDir: File): Future[Long] = { 
    val request = Get(uri) 
    val source = Source.single((request,())) 
    val requestResponseFlow = Http().superPool[Unit]() 
    val sourceWithDest: Source[(ByteString, File), Unit] = source. 
    via(requestResponseFlow). 
    map(responseOrFail). 
    flatMapConcat(responseToByteSourceWithDest(_, downloadDir)) 
    sourceWithDest.runWith(???) 
} 

は、だから今、私は(私は、各Fileを各Fileための一つ以上の(ByteString, File)要素を放出するSource言っていますHttpRequest)。

これらを受け取り、動的なSinkにルーティングする方法はありますか?だから私はとdownloadViaFlow2を完了できることを

def runWithMap[T, Mat2](f: T => Graph[SinkShape[Out], Mat2])(implicit materializer: Materializer): Mat2 = ??? 

私は、次のような、flatMapConcatのようなものを考えている

def destToSink(destination: File): Sink[(ByteString, File), Future[Long]] = { 
    val sink = FileIO.toFile(destination, true) 
    Flow[(ByteString, File)].map(_._1).toMat(sink)(Keep.right) 
} 
sourceWithDest.runWithMap { 
    case (_, file) => destToSink(file) 
} 

答えて

5

ソリューションをflatMapConcatを必要としません。あなたが書いたファイルから任意の戻り値を必要としないなら、あなたはSink.foreachを使用することができます。

def writeFile(downloadDir : File)(httpResponse : HttpResponse) : Future[Long] = { 
    val file = destinationFile(downloadDir, httpResponse) 
    httpResponse.entity.dataBytes.runWith(FileIO.toFile(file)) 
} 

def downloadViaFlow2(uri: Uri, downloadDir: File) : Future[Unit] = { 
    val request = HttpRequest(uri=uri) 
    val source = Source.single((request,())) 
    val requestResponseFlow = Http().superPool[Unit]() 

    source.via(requestResponseFlow) 
     .map(responseOrFail) 
     .map(_._1) 
     .runWith(Sink.foreach(writeFile(downloadDir))) 
} 

Sink.foreachwriteFile機能からFuturesを作成すること。したがって背圧はあまりありません。 writeFileはハードドライブによって遅くなる可能性がありますが、ストリームはFuturesを生成し続けます。これを制御するには、Flow.mapAsyncUnordered(またはFlow.mapAsync)を使用することができます。

val parallelism = 10 

source.via(requestResponseFlow) 
     .map(responseOrFail) 
     .map(_._1) 
     .mapAsyncUnordered(parallelism)(writeFile(downloadDir)) 
     .runWith(Sink.ignore) 

をあなたがSink.foldと結合する必要があり、合計カウントに長い値を蓄積する場合:

source.via(requestResponseFlow) 
     .map(responseOrFail) 
     .map(_._1) 
     .mapAsyncUnordered(parallelism)(writeFile(downloadDir)) 
     .runWith(Sink.fold(0L)(_ + _)) 

倍が維持されます実行中の合計であり、要求元が枯渇したときに最終値を出力します。

+0

私はこれよりも良い方法があることを望んでいました。私は、これが実際に正しく動作するかどうかも確信していません。 'writeFile'は、FileIOストリームがマテリアライズされるとすぐに戻ります。レスポンスがチャンクされている場合は、順番にファイルに書き込む必要があります。'mapAsync'の使用に似た問題です。 'append'パラメータも設定する必要があります。また、ファイルへの書き込みエラーがあっても、外部ストリームがエラー信号を受け取ることはないようです。 – Steiny

+1

@Steinyあなたの複数のコメントへの私の答えの打ち切り:(a)正しい、書き込みファイルの戻り値はすぐに返ってきますが、mapAsyncはこれを処理します(b)chunkedsourceを修正できる解決策もなく、元の質問/ (c)追加は、同じファイルに書き込む場合(d)、ファイルの書き込みに失敗した場合に外部ストリームを強制的に強制的に強制終了することは元の質問の一部ではない。あなたは「これを取って動的なシンクに向けるのはとにかくありますか?」と尋ねました。私の回答は**その質問に答えます。あなたのサンプルコードの文脈で私の返答を書いた... –

関連する問題