2016-06-29 8 views
2

Subjectで公開されている問題のビットがRx.Observable.webSocketで公開されています。 completeの後にWebSocketが再接続されると、ソケット上に来る次のメッセージをプッシュするのではなく、Subjectへのその後のサブスクリプションもすぐに完了します。Rx.Observable.webSocket()は再接続後すぐに完了しますか?

私は、これがどのように機能するのかについて何か基本的なものが欠けていると思います。

ここにはrequirebin/pasteがあります。これは、私が意味することと、私が期待していた振る舞いを少し良く表してくれることを願っています。それは私が見落とした超簡単な何かになると思う。

Requirebin

var Rx = require('rxjs') 

var subject = Rx.Observable.webSocket('wss://echo.websocket.org') 

subject.next(JSON.stringify('one')) 

subject.subscribe(
    function (msg) { 
     console.log('a', msg) 
    }, 
    null, 
    function() { 
     console.log('a complete') 
    } 
) 

setTimeout(function() { 
    subject.complete() 
}, 1000) 

setTimeout(function() { 
    subject.next(JSON.stringify('two')) 
}, 3000) 

setTimeout(function() { 
    subject.next(JSON.stringify('three')) 

    subject.subscribe(
     function (msg) { 
      // Was hoping to get 'two' and 'three' 
      console.log('b', msg) 
     }, 
     null, 
     function() { 
      // Instead, we immediately get here. 
      console.log('b complete') 
     } 
) 
}, 5000) 
+0

あなたは冷たい観測対ホットに対処されるかもしれないような音。 https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/creating.md#cold-vs-hot-observables – emc

答えて

0

私の代わりにobservable-socketとソケットがクローズされた後の再接続を行うためのコードのビットを選ぶ、Rx.Observable.webSocketを使用していないことになった。

requirebin

const observableSocket = require('observable-socket') 
 
const Rx = require('rxjs') 
 
const EventEmitter = require('events') 
 

 
function makeObservableLoop (socketEmitter, send, receive) { 
 
    socketEmitter.once('open', function onSocketEmit (wSocket) { 
 
     const oSocket = observableSocket(wSocket) 
 
     const sendSubscription = send.subscribe(msg => oSocket.next(msg)) 
 

 
     oSocket.subscribe(
 
      function onNext (msg) { 
 
       receive.next(msg) 
 
      }, 
 

 
      function onError (err) { 
 
       error(err) 
 
       sendSubscription.unsubscribe() 
 

 
       makeObservableLoop(socketEmitter, send, receive) 
 
      }, 
 

 
      function onComplete() { 
 
       sendSubscription.unsubscribe() 
 

 
       makeObservableLoop(socketEmitter, send, receive) 
 
      } 
 
     ) 
 
    }) 
 
} 
 

 
function makeSocketLoop (emitter) { 
 
    const websocket = new WebSocket('wss://echo.websocket.org') 
 

 
    function onOpen() { 
 
    emitter.emit('open', websocket) 
 
    
 
    setTimeout(function() { 
 
     websocket.close() 
 
    }, 5000) 
 
    } 
 

 
    function onClose() { 
 
    makeSocketLoop(emitter) 
 
    } 
 
    
 
    websocket.onopen = onOpen 
 
    websocket.onclose = onClose 
 
} 
 

 
function init (socketEmitter) { 
 
    const _send = new Rx.Subject() 
 
    const _receive = new Rx.Subject() 
 

 
    makeObservableLoop(socketEmitter, _send, _receive) 
 

 
    const send = msg => _send.next(JSON.stringify(msg)) 
 
    const receive = _receive.asObservable() 
 

 
    return { 
 
     send: send, 
 
     read: receive, 
 
    } 
 
} 
 

 
const emitter = new EventEmitter() 
 

 
makeSocketLoop(emitter) 
 
const theSubjectz = init(emitter) 
 

 
setInterval(function() { 
 
    theSubjectz.send('echo, you there?') 
 
}, 1000) 
 

 
theSubjectz.read.subscribe(function (el) { 
 
    console.log(el) 
 
})

2

別のきちんとした解決策は、WebSocketSubjectを超えるラッパーを使用することです。詳細については、

class RxWebsocketSubject<T> extends Subject<T> { 
    private reconnectionObservable: Observable<number>; 
    private wsSubjectConfig: WebSocketSubjectConfig; 
    private socket: WebSocketSubject<any>; 
    private connectionObserver: Observer<boolean>; 
    public connectionStatus: Observable<boolean>; 

    defaultResultSelector = (e: MessageEvent) => { 
    return JSON.parse(e.data); 
    } 

    defaultSerializer = (data: any): string => { 
    return JSON.stringify(data); 
    } 

    constructor(
    private url: string, 
    private reconnectInterval: number = 5000, 
    private reconnectAttempts: number = 10, 
    private resultSelector?: (e: MessageEvent) => any, 
    private serializer?: (data: any) => string, 
    ) { 
    super(); 

    this.connectionStatus = new Observable((observer) => { 
     this.connectionObserver = observer; 
    }).share().distinctUntilChanged(); 

    if (!resultSelector) { 
     this.resultSelector = this.defaultResultSelector; 
    } 
    if (!this.serializer) { 
     this.serializer = this.defaultSerializer; 
    } 

    this.wsSubjectConfig = { 
     url: url, 
     closeObserver: { 
     next: (e: CloseEvent) => { 
      this.socket = null; 
      this.connectionObserver.next(false); 
     } 
     }, 
     openObserver: { 
     next: (e: Event) => { 
      this.connectionObserver.next(true); 
     } 
     } 
    }; 
    this.connect(); 
    this.connectionStatus.subscribe((isConnected) => { 
     if (!this.reconnectionObservable && typeof(isConnected) == "boolean" && !isConnected) { 
     this.reconnect(); 
     } 
    }); 
    } 

    connect(): void { 
    this.socket = new WebSocketSubject(this.wsSubjectConfig); 
    this.socket.subscribe(
     (m) => { 
     this.next(m); 
     }, 
     (error: Event) => { 
     if (!this.socket) { 
      this.reconnect(); 
     } 
     }); 
    } 

    reconnect(): void { 
    this.reconnectionObservable = Observable.interval(this.reconnectInterval) 
     .takeWhile((v, index) => { 
     return index < this.reconnectAttempts && !this.socket 
    }); 
    this.reconnectionObservable.subscribe(
    () => { 
     this.connect(); 
     }, 
     null, 
    () => { 
     this.reconnectionObservable = null; 
     if (!this.socket) { 
      this.complete(); 
      this.connectionObserver.complete(); 
     } 
     }); 
    } 

    send(data: any): void { 
    this.socket.next(this.serializer(data)); 
    } 
} 

次の資料とソースコードを参照してください。
Auto WebSocket reconnection with RxJS
GitHub - Full working rxjs websocket example

関連する問題