2017-09-05 5 views
1

Twisted web.Resourceを使用してMJPEGサーバーを実装しようとしています。 はアップストリームのgstreamerプロセスからデータを取得します TCPにMJPEGデータを書き込むポートlocalhost:9999私は今、この のようなものを持っている:Python-twisted:TCPリーダーとWebリソースの間のバッファリングを防止する

from twisted.internet import reactor, protocol, defer 
from twisted.web import server, resource 

class MJpegResource(resource.Resource): 
    def __init__(self, queues): 
     self.queues = queues 

    @defer.inlineCallbacks 
    def deferredRenderer(self, request): 
     q = defer.DeferredQueue() 
     self.queues.append([q, request]) 
     while True: 
      yield q.get() 

    def render_GET(self, request): 
     request.setHeader("content-type", 'multipart/x-mixed-replace; boundary=--spionisto') 
     self.deferredRenderer(request) 
     return server.NOT_DONE_YET 

class JpegStreamReader(protocol.Protocol): 
    def dataReceived(self, data): 
     for (q, req) in self.factory.queues: 
      req.write(data) 
      q.put('') 

root = File('web') 
root.putChild('stream.mjpeg', MJpegResource(queues)) 

factory = protocol.Factory() 
factory.protocol = JpegStreamReader 
factory.queues = queues 
reactor.listenTCP(9999, factory) 

site = server.Site(root) 
reactor.listenTCP(80, site) 

# spawn gstreamer process which writes to port 9999. 
# The gstream process is launched using: 
# gst-launch-1.0 -v \ 
#  v4l2src device=/dev/video0 \ 
#   ! video/x-raw,framerate=15/1, width=640, height=480 \ 
#   ! jpegenc \ 
#   ! multipartmux boundary=spionisto \ 
#   ! tcpclientsink host=127.0.0.1 port=9999 \ 

reactor.run() 

だから、のようなもの:

gstreamer --> JpegStreamReader --> MJpegResource 

これはOK動作しますが、私は時折、 ブラウザ上のビデオは、「これまで何であるか後ろに下がることを発見しましたライブ "(30-40秒程度で )。ブラウザを更新するとすぐに、MJPEGストリームは を「ライブ」に戻します。だから私の疑問は、JpegStreamReaderができないことです。 は、web.http.Requestに対応するTCPソケットに、 として書き込むことができません。gstreamerがTCPソケット9999を埋めていて、JpegStreamReaderへの入力キューにバッファーがついています。

ストリームは「ライブ」となっているので、フレームを廃棄して に戻しても問題ありません。しかし、私は を検出する方法がわからないので、JpegStreamReaderが後ろに落ちていますか? このパイプラインをライブストリームのように振る舞わせる方法についての提案はありますか?

これを行うための基本的なアーキテクチャがある場合は、 の提案も大変ありがとうございます。

答えて

1

Requestオブジェクトにプロデューサを登録することができます。そのRequestの書き込みバッファがいっぱいになったときに呼び出されるpauseProducingメソッドがあります。部屋が利用可能になると、resumeProducingメソッド呼び出しがあります。

この情報を使用して、タイムリーに配信されないフレームをドロップすることができます。しかし、サーバー内のフレームを実際に識別する必要があります(現在は、フレームが開始または終了する場所がわからないストリームとしてデータを渡す方法はdataReceivedしかありません)。これはまた、バッファ充満度がおそらくストリームの遅延の非常に遅い指標であるという問題を有する。また、システムのボトルネックがgstreamerからデータを読み込んでリクエストに書き込む間にない場合、プログラムのこの部分にバックプレッシャの感度を追加することは役に立ちません。

+0

ありがとうございました!私はちょうどあなたの提案を実装し、私はそれが実際に私の問題を解決すべきだと思う。私は今、ビデオストリームをしばらく再生していて、内部ネットワークに余分なネットワークアクティビティがある場合、予想されるフレームが落ちるのを見ています。 最後の解決策は、1秒の遅延でresumeProducingに応答することだけです。 後世のために、私は最後に思いついたコードを含めたいと思います。それを自分で別の答えに入れたり、コメントに入れたりするという条約です。別の回答を私がその回答またはこれを受け入れるべきかどうか? –

2

これはJean-Paul Caleroneの 提案を実装する最終的な解決策です。ここで、PushProducerインターフェイスの を実装するJpegProducerクラスがあることに注目してください。一時停止が要求されると、フラグを設定します。この は、TCPストリームリーダー(JpegStreamReader)が、特定のプロデューサが詰まっている場合、そのフレームを にプッシュしないようにします。 Jean-Paulの提案によれば、 もマルチパートMJPEGストリームを分割して、 が常にMJPEG出力フォーマットを破ることなくフレームをドロップするようにしなければなりませんでした。

from twisted.internet import reactor, protocol, defer, interfaces 
from twisted.web import server, resource 
from zope.interface import implementer 

class MJpegResource(resource.Resource): 
    def __init__(self, queues): 
     self.queues = queues 

    def setupProducer(self, request): 
     producer = JpegProducer(request) 
     request.notifyFinish().addErrback(self._responseFailed, producer) 
     request.registerProducer(producer, True) 

     self.queues.append(producer) 

    def _responseFailed(self, err, producer): 
     producer.stopProducing() 

    def render_GET(self, request): 
     request.setHeader("content-type", 'multipart/x-mixed-replace; boundary=--spionisto') 
     self.setupProducer(request) 
     return server.NOT_DONE_YET 

@implementer(interfaces.IPushProducer) 
class JpegProducer(object): 
    def __init__(self, request): 
     self.request = request 
     self.isPaused = False 
     self.isStopped = False 
     self.delayedCall = None 

    def cancelCall(self): 
     if self.delayedCall: 
      self.delayedCall.cancel() 
      self.delayedCall = None 

    def pauseProducing(self): 
     self.isPaused = True 
     self.cancelCall() 

    def resetPausedFlag(self): 
     self.isPaused = False 
     self.delayedCall = None 

    def resumeProducing(self): 
     # calling self.cancelCall is defensive. We should not really get 
     # called with multiple resumeProducing calls without any 
     # pauseProducing in the middle. 
     self.cancelCall() 
     self.delayedCall = reactor.callLater(1, self.resetPausedFlag) 
     log('producer is requesting to be resumed') 

    def stopProducing(self): 
     self.isPaused = True 
     self.isStopped = True 
     log('producer is requesting to be stopped') 

MJPEG_SEP = '--spionisto\r\n' 

class JpegStreamReader(protocol.Protocol): 
    def __init__(self): 
     self.tnow = None 

    def connectionMade(self): 
     self.data = '' 
     self.tnow = datetime.now() 

    def dataReceived(self, data): 
     self.data += data 

     chunks = self.data.rsplit(MJPEG_SEP, 1) 

     dataToSend = '' 
     if len(chunks) == 2: 
      dataToSend = chunks[0] + MJPEG_SEP 

     self.data = chunks[-1] 

     for producer in self.factory.queues: 
      if (not producer.isPaused): 
       producer.request.write(dataToSend) 
関連する問題