2013-10-22 13 views
10

WebSocket経由でリモートAPIとやりとりするコードに取り組んでいます。私のデータレイヤーは、Webソケット接続の確立と監視を担当します。また、送信するwebsocketメッセージをエンキューするためにアプリケーションが使用できるメソッドも含まれています。アプリケーションコードは、websocket接続の状態を検査する責任を負うべきではありません(別名、fire-and-forget)。条件付きでRACSignal値をバッファリングする方法は?

理想的には、私は次のように機能するために、データ層にたい:

  • データ層は、WebSocketのエンドポイント(self.isConnected == NO)への接続を持っていない場合は、メッセージが内部でバッファされています。
  • 接続が利用可能になると(self.isConnected == YES)、バッファされたメッセージが直ちに送信され、後続のメッセージはすぐに送信されます。ここで

は、私が思い付くことができましたものです:

#import "RACSignal+Buffering.h" 

@implementation RACSignal (Buffering) 

- (RACSignal*)bufferWithSignal:(RACSignal*)shouldBuffer 
{ 
    return [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) { 

     RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable]; 

     NSMutableArray* bufferedValues = [[NSMutableArray alloc] init]; 
     __block BOOL buffering = NO; 

     void (^bufferHandler)() = ^{ 
      if (!buffering) 
      { 
       for (id val in bufferedValues) 
       { 
        [subscriber sendNext:val]; 
       } 

       [bufferedValues removeAllObjects]; 
      } 
     }; 

     RACDisposable* bufferDisposable = [shouldBuffer subscribeNext:^(NSNumber* shouldBuffer) { 

      buffering = shouldBuffer.boolValue; 
      bufferHandler(); 

     }]; 

     if (bufferDisposable) 
     { 
      [disposable addDisposable:bufferDisposable]; 
     } 

     RACDisposable* valueDisposable = [self subscribeNext:^(id x) { 

      [bufferedValues addObject:x]; 
      bufferHandler(); 

     } error:^(NSError *error) { 
      [subscriber sendError:error]; 
     } completed:^{ 
      [subscriber sendCompleted]; 
     }]; 

     if (valueDisposable) 
     { 
      [disposable addDisposable:valueDisposable]; 
     } 

     return disposable; 
    }]; 
} 

@end 

最後に、これは、それが使用される方法のための擬似コードです:

@interface APIManager() 

@property (nonatomic) RACSubject* requests; 

@end 

@implementation WebsocketDataLayer 

- (id)init 
{ 
    self = [super init]; 

    if (self) { 

     RACSignal* connectedSignal = RACObserve(self, connected); 

     self.requests = [[RACSubject alloc] init]; 

     RACSignal* bufferedApiRequests = [self.requests bufferWithSignal:connectedSignal]; 

     [self rac_liftSelector:@selector(sendRequest:) withSignalsFromArray:@[bufferedApiRequests]]; 
    } 
    return self; 
} 

- (void)enqueueRequest:(NSString*)request 
{ 
    [self.requests sendNext:request]; 
} 

- (void)sendRequest:(NSString*)request 
{ 
    DebugLog(@"Making websocket request: %@", request); 
} 

@end 

私の質問は次のとおりです。これは値をバッファリングするための正しいアプローチですか?これを扱うRACの慣用的な方法がありますか?

答えて

11

バッファリングが-flattenMap:RACObserveを使用して自然な実装につながる、個々の要求に適用されるものとして考えることができます。

@weakify(self); 
RACSignal *bufferedRequests = [self.requests flattenMap:^(NSString *request) { 
    @strongify(self); 

    // Waits for self.connected to be YES, or checks that it already is, 
    // then forwards the request. 
    return [[[[RACObserve(self, connected) 
     ignore:@NO] 
     take:1] 
     // Replace the property value with our request. 
     mapReplace:request]; 
}]; 

順序が重要な場合、あなたは-map:プラス-concat-flattenMap:を置き換えることができます。これらの実装では、任意のカスタム演算子の必要性を回避し、手動サブスクリプションなしで動作します(これは悪名高いものです)。私は間違っていない場合

+1

'map' +' concat'オプションは、このようなものになります。https://github.com/ReactiveCocoa/ReactiveCocoa/blob/0d98e1058a83e5d7034cdda79fa5ec5f8b0bb780/ReactiveCocoaFramework/ReactiveCocoa/UIRefreshControl%2BRACCommandSupport.m# L42 – allprog

+0

優れた答えです。好奇心のためだけに、これは高スループットのバッファリングにはあまりにも無駄ではありませんか? bufferWithTime:がこのアプローチの観点から実装されていなかった理由はこれですか? – allprog

+1

@allprog '-bufferWithTime:'のような実装よりも少し遅いかもしれませんが、目立たないようにしてください。 'RACObserve'を引き出し、' -replayLast'を使って最適化することができます。 '-bufferWithTime:'は、ピリオドが一旦アップすると(特に複雑さを増やして)、_entire_バッファを送信することを保証しますが、ここではそれほど重要ではありません。 –

0

bufferWithTime:操作で実装されているものとほぼ同じですが、もっと慣用的に実装する既存の操作は考えられません。 (おそらく、これがbufferWithTimeがこのように実装された理由です。)その実装を使用してコードを見直すと、考えなかったいくつかの不具合が明らかになることがあります。

しかし、正直言って、これはあまり難しくありません。トリガー信号が発生すると、出力をバッファしてコンテンツを吐き出すバッファリング操作が必要です。おそらく、ほとんどのバッファリングはこの機能の観点から実装できるので、フレームワークに価値を付加することになります。

関連する問題