2016-10-06 1 views
2

私は現在、api-callsをたくさん組み合わせる必要があるため、非同期イベントを処理する方が便利なため、rxjavaを初めて使用していますアンドロイドのライフサイクル。Androidでリアクティブエクステンションを使用して進捗状況を更新するファイルをダウンロードするには

しかし、アプリケーションでは、zipアーカイブにパッケージ化された静的データセットを読み込む必要があることもあります。私はrxを一貫性のためにこの操作にも使用しようとしますが、うまくいきますが、進捗状況を購読してUIをファイルのダウンロード進捗状況に更新する方法を本当に把握していません。

これは私がokhttp-ライブラリーを利用したファイルをダウンロードするために今使用していますコードです:

downloadService.downloadFile(filename) 
    .flatMap(new Func1<Response<ResponseBody>, Observable<File>>() 
    { 
     @Override 
     public Observable<File> call(final Response<ResponseBody> responseBodyResponse) 
     { 
      return Observable.create(new Observable.OnSubscribe<File>() 
      { 
       @Override 
       public void call(Subscriber<? super File> subscriber) 
       { 
        try 
        { 
         File file = new File(Environment.getExternalStoragePublicDirectory(Environment.DIRECTORY_DOWNLOADS).getAbsoluteFile(), filename); 
         BufferedSink sink = Okio.buffer(Okio.sink(file)); 
         sink.writeAll(responseBodyResponse.body().source()); 
         sink.close(); 
         subscriber.onNext(file); 
         subscriber.onCompleted(); 
        } 
        catch (IOException e) 
        { 
         e.printStackTrace(); 
         subscriber.onError(e); 
        } 
       } 
      }); 
     } 
    }) 
    .subscribeOn(Schedulers.io()) 
    .observeOn(AndroidSchedulers.mainThread()) 
    .subscribe(new Observer<File>() 
    { 
     @Override 
     public void onCompleted() 
     { 
      Log.d("downloadZipFile", "onCompleted"); 
     } 

     @Override 
     public void onError(Throwable e) { 
      e.printStackTrace(); 
      Log.d("downloadZipFile", "Error " + e.getMessage()); 
     } 

     @Override 
     public void onNext(File file) { 
      Log.d("downloadZipFile", "File downloaded to " + file.getAbsolutePath()); 
     } 
    }); 

イベントを進行するサブスクリプションを実装するための良い方法は何ですか?

答えて

5

まず最初に、Observable.create()を使用しないでください。あなたが何をしているのかを理解し、Observableが処理する必要があるすべての背圧を処理する準備ができている場合を除きます。

public Observable<DownloadProgress<File>> downloadFile(@NonNull final String filename) { 
    return downloadService.downloadFile(filename) 
      .switchMap(response -> Observable.fromEmitter(emitter -> { 
       ResponseBody body = response.body(); 
       final long contentLength = body.contentLength(); 
       ForwardingSource forwardingSource = new ForwardingSource(body.source()) { 
        private long totalBytesRead = 0L; 

        @Override 
        public long read(Buffer sink, long byteCount) throws IOException { 
         long bytesRead = super.read(sink, byteCount); 
         // read() returns the number of bytes read, or -1 if this source is exhausted. 
         totalBytesRead += bytesRead != -1 ? bytesRead : 0; 
         boolean done = bytesRead == -1; 
         float progress = done ? 1f : (float) bytesRead/contentLength; 
         emitter.onNext(new DownloadProgress<>(progress)); 
         return bytesRead; 
        } 
       }; 
       emitter.setCancellation(body::close); 
       try { 
        File saveLocation = new File(Environment.getExternalStoragePublicDirectory(Environment.DIRECTORY_DOWNLOADS).getAbsoluteFile(), filename); 
        saveLocation.getParentFile().mkdirs(); 
        BufferedSink sink = Okio.buffer(Okio.sink(saveLocation)); 
        sink.writeAll(forwardingSource); 
        sink.close(); 
        emitter.onNext(new DownloadProgress<>(saveLocation)); 
        emitter.onCompleted(); 
       } catch (IOException e) { 
        emitter.onError(e); 
       } 
      }, Emitter.BackpressureMode.LATEST)); 
} 

fromEmitter()はRxJava、現在実験が、作品に最近追加された:あなたはこのような何かを行うことができます次に

public class DownloadProgress<DATA> { 
    private final float progress; 
    private final DATA data; 

    public DownloadProgress(float progress) { 
     this.progress = progress; 
     this.data = null; 
    } 

    public DownloadProgress(@NonNull DATA data) { 
     this.progress = 1f; 
     this.data = data; 
    } 

    public float getProgress() { 
     return progress; 
    } 

    public boolean isDone() { 
     return data != null; 
    } 

    public DATA getData() { 
     return data; 
    } 
} 

あなたの進捗情報を保持するクラスを作成することから始めた

非常によく(最近改名され、以前はfromAsync()と呼ばれていました)。

あなたは、このようにそれを使用することができます:

String filename = "yourFileName"; 
    downloadFile(filename) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .doOnNext(fileDownloadProgress -> { 
       float progress = fileDownloadProgress.getProgress(); 
       // TODO update UI 
      }) 
      .filter(DownloadProgress::isDone) 
      .map(DownloadProgress::getData) 
      .subscribe(file -> { 
       // file downloaded 
      }, throwable -> { 
       // error 
      }); 

これは動作するはずです(私はそれをテストしていませんでした)しかし、あなたが退会場合は、継続的なHTTP呼び出しをキャンセルすることはできません。それについては

public Observable<DownloadProgress<File>> downloadFile(@NonNull final String url, @NonNull final File saveLocation) { 
    return Observable.fromEmitter(emitter -> { 
     Request request = new Request.Builder() 
       .url(url) 
       .build(); 

     final ProgressListener progressListener = (bytesRead, contentLength, done) -> { 
      // range [0,1] 
      float progress = done ? 1f : (float) bytesRead/contentLength; 
      emitter.onNext(new DownloadProgress<>(progress)); 
     }; 

     OkHttpClient client = new OkHttpClient.Builder() 
       .addNetworkInterceptor(chain -> { 
        Response originalResponse = chain.proceed(chain.request()); 
        return originalResponse.newBuilder() 
          .body(new ProgressResponseBody(originalResponse.body(), progressListener)) 
          .build(); 
       }) 
       .build(); 

     final Call call = client.newCall(request); 
     emitter.setCancellation(() -> call.cancel()); 

     try { 
      Response response = call.execute(); 
      BufferedSink sink = Okio.buffer(Okio.sink(saveLocation)); 
      sink.writeAll(response.body().source()); 
      sink.close(); 
      emitter.onNext(new DownloadProgress<>(saveLocation)); 
      emitter.onCompleted(); 
     } catch (IOException e) { 
      emitter.onError(e); 
     } 
    }, Emitter.BackpressureMode.LATEST); 
} 

あなたも、この必要があります:

public static class ProgressResponseBody extends ResponseBody { 

    private final ResponseBody responseBody; 
    private final ProgressListener progressListener; 
    private BufferedSource bufferedSource; 

    public ProgressResponseBody(ResponseBody responseBody, ProgressListener progressListener) { 
     this.responseBody = responseBody; 
     this.progressListener = progressListener; 
    } 

    @Override 
    public MediaType contentType() { 
     return responseBody.contentType(); 
    } 

    @Override 
    public long contentLength() { 
     return responseBody.contentLength(); 
    } 

    @Override 
    public BufferedSource source() { 
     if (bufferedSource == null) { 
      bufferedSource = Okio.buffer(source(responseBody.source())); 
     } 
     return bufferedSource; 
    } 

    private Source source(Source source) { 
     return new ForwardingSource(source) { 
      long totalBytesRead = 0L; 

      @Override 
      public long read(Buffer sink, long byteCount) throws IOException { 
       long bytesRead = super.read(sink, byteCount); 
       // read() returns the number of bytes read, or -1 if this source is exhausted. 
       totalBytesRead += bytesRead != -1 ? bytesRead : 0; 
       progressListener.update(totalBytesRead, responseBody.contentLength(), bytesRead == -1); 
       return bytesRead; 
      } 
     }; 
    } 
} 

interface ProgressListener { 
    void update(long bytesRead, long contentLength, boolean done); 
} 

使用方法:

をあなたのダウンロードサービスを変更することができる場合

私はむしろレシピ例hereに触発され、OkHttpを通してそれを扱うでしょう

String filename = "yourFileName"; 
    String url = "http://your.url.here"; 
    File saveLocation = new File(Environment.getExternalStoragePublicDirectory(Environment.DIRECTORY_DOWNLOADS).getAbsoluteFile(), filename); 
    downloadFile(url, saveLocation) 
      .subscribeOn(Schedulers.io()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .doOnNext(fileDownloadProgress -> { 
       float progress = fileDownloadProgress.getProgress(); 
       // TODO update UI 
      }) 
      .filter(DownloadProgress::isDone) 
      .map(DownloadProgress::getData) 
      .subscribe(file -> { 
       // file downloaded 
      }, throwable -> { 
       // error 
      }); 
+0

このような詳細な例を提供するために時間を割いていただきありがとうございます。ありがとうございました。 – Markus

+0

私はそれが動作するかどうか、私はそれをテストしていないことを教えてください –

+0

@マーカスは、上記の答えのための作業例を投稿することができます。それは本当に役立つだろう。 – Aks4125

関連する問題