2016-04-11 6 views
12

私は、バックグラウンドデータ同期を使用してAndroid用アプリケーションを開発しています。 私は現在、RxJavaを使用して定期的にサーバーにデータを投稿しています。 それ以外の場合は、すぐに同期をトリガーするボタン「強制同期」をユーザーに提供したいと考えています。 Observable.interval()を使用して一定の時間間隔でデータをプッシュする方法を知っていて、Observalbe.just()を強制的に使用する方法を知っていましたが、もしそれが発生した場合は、走るAndroidでRxJavaを使用したキューイングタスク

ここでは、自動同期の間隔が1分の場合を例にして、同期が40秒続くとしましょう(わかりやすいようにここで誇張しています)。万が一、自動で動いているときにユーザーが「強制」ボタンを押した場合(強制的に強制的に実行しているときに自動的にトリガーする)、私は2回目の同期要求を最初のものが終了します。

私はそれにいくつかのより多くの視点を置いてもよい。このイメージを描きました:

enter image description here

あなたが見ることができるように、自動が(いくつかObservable.interval()による)、および同期の途中でトリガされ、ユーザが「強制」ボタンを押す。ここで、最初の要求が完了するのを待ってから、強制的な要求に対して再度開始します。 強制要求が実行されている間に、新しい自動要求が再度トリガされ、キューに追加されました。最後のキューがキューから終了した後、すべてが停止し、その後、少し後で自動スケジューリングが再試行されました。

誰かがこれを行う方法をオペレーターに訂正することを私に指摘することができます。私はObservable.combineLatest()で試しましたが、キューリストは最初に送出されました。キューに新しい同期を追加すると、前の操作が完了しても続かなかったのです。

すべてのヘルプは大幅に、 ダルコ

答えて

11

を高く評価しているあなたはこれを行うことができ、ボタンでタイマーをマージすることで、Observable/SubjectをクリックしonBackpressureBufferのキューイング効果を利用し、1を実行しているを確認します、それにconcatMap処理一度に。

PublishSubject<Long> subject = PublishSubject.create(); 

Observable<Long> periodic = Observable.interval(1, 1, TimeUnit.SECONDS); 

periodic.mergeWith(subject) 
.onBackpressureBuffer() 
.concatMap(new Func1<Long, Observable<Integer>>() { 
    @Override 
    public Observable<Integer> call(Long v) { 
     // simulates the task to run 
     return Observable.just(1) 
       .delay(300, TimeUnit.MILLISECONDS); 
    } 
} 
).subscribe(System.out::println, Throwable::printStackTrace); 

Thread.sleep(1100); 
// user clicks a button 
subject.onNext(-1L); 

Thread.sleep(800); 
+0

[OK]を、それが良い方向だが、私はのカップルを持っています質問... 無名関数はどうなっていますか?私はAPIレベル16のために開発する必要があるため、私はJava 8を使用していません。 また、どのラインのシミュレーションボタンが押されますか? –

+0

ラムダは、観測可能な形式で、同期がどこに起こるかを示します。 subject.onNext()は、ユーザーがボタンをクリックする場所です。 – akarnokd

+0

もう一度質問を追加してください。だから私はこれを[@akarnokd](http://stackoverflow.com/users/61158/akarnokd)として提案し、すべてがうまくいきました。今、チェーンが終了した後に 'onComplete()'コールを取得しようとしています。上記の画像を見ると、最初のチェーンは3回目の同期処理の後に終了します。これは、連鎖がすべての同期の後ではなく終了した後にUIを更新したいのかもしれないからです。 –

3

良い解決策と受け入れ答えは私が行うには別のオプションを共有したいと思いますが存在するが、この使用してスケジューラとSingleThreadExecutor

public static void main(String[] args) throws Exception { 
    System.out.println(" init "); 
    Observable<Long> numberObservable = 
      Observable.interval(700, TimeUnit.MILLISECONDS).take(10); 

    final Subject subject = PublishSubject.create(); 

    Executor executor = Executors.newSingleThreadExecutor(); 
    Scheduler scheduler = Schedulers.from(executor); 
    numberObservable.observeOn(scheduler).subscribe(subject); 

    subject.subscribe(onNextFunc("subscriber 1"), onErrorFunc("subscriber 1"), 
        onCompleteFunc("subscriber 1")); 

    Thread.sleep(800); 
    //simulate action 
    executor.execute(new Runnable() { 
     @Override 
     public void run() { 
      subject.onNext(333l); 
     } 
    }); 

    Thread.sleep(5000); 
} 

static Action1<Long> onNextFunc(final String who) { 
    return new Action1<Long>() { 
     public void call(Long x) { 
      System.out.println(who + " got " + x + " :: " + Thread.currentThread().getName() 
        + " -- " + System.currentTimeMillis()); 
      try { 
       //simulate some work 
       Thread.sleep(500); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
    }; 
} 

static Action1<Throwable> onErrorFunc(final String who) { 
    return new Action1<Throwable>() { 
     public void call(Throwable t) { 
      t.printStackTrace(); 
     } 
    }; 
} 

static Action0 onCompleteFunc(final String who) { 
    return new Action0() { 
     public void call() { 
      System.out.println(who + " complete"); 
     } 
    }; 
} 
関連する問題