2017-10-20 5 views
0

私はいくつかの段階を実装するためにhttps://blog.golang.org/pipelinesの記事に従っています。golangでは、次のステージの遅延を導入するパイプラインステージを作成する方法はありますか?

イベントがパイプラインの次のステージで渡されるまでに数秒の遅延を導入するステージが必要です。

私の懸念は、以下のコードでは、イベントを渡す前にtime.Sleep()という無限の数のgoルーチンを生成することです。これを行うより良い方法はありますか?

ありがとうございます!

func fooStage(inChan <- chan *Bar) (<- chan *Bar) { 
    out := make(chan *Bar, 10000) 
    go func() { 
     defer close(out) 
     wg := sync.WaitGroup{} 
     for { 
      select { 
      case event, ok := <-inChan: 
       if !ok { 
        // inChan closed 
        break 
       } 
       wg.Add(1) 
       go func() { 
        time.Sleep(5 * time.Second) 
        out <- event 
        wg.Done() 
       }() 
      } 
     } 
     wg.Wait() 
    }() 
    return out 
} 

答えて

1

別のチャネルを使用して、ループが作成できるアクティブなゴルーチンの数を制限することができます。

const numRoutines = 10 

func fooStage(inChan <-chan *Bar) <-chan *Bar { 
    out := make(chan *Bar, 10000) 
    routines := make(chan struct{}, numRoutines) 
    go func() { 
     defer close(out) 
     wg := sync.WaitGroup{} 
     for { 
      select { 
      case event, ok := <-inChan: 
       if !ok { 
        // inChan closed 
        break 
       } 
       wg.Add(1) 
       routines <- struct{}{} 
       go func() { 
        time.Sleep(5 * time.Second) 
        out <- event 
        wg.Done() 
        <-routines 
       }() 
      } 
     } 
     wg.Wait() 
    }() 
    return out 
} 
+0

ありがとう、これは良い考えのようです。私が見ることができる唯一の欠点は、 'routines'チャンネルがブロックされていると、イベントが5秒以上遅れることです。私が思うイベントの中でタイムスタンプを持たずにそれに取り組む良い方法はありません。 – ultimoo

+1

@ultimoo 5秒待っているので、数百から数千のゴルーチンを簡単に実行することができ、実際のイベント待ち時間が短縮されます。このようなものは、コードを読むだけでは決まりません。実際にどのように動作するかを実際に判断するには、テストとベンチマークが必要になります。 – RayfenWindspear

+0

絶対に、これはもっと多くの実験です。私の直感は、これらのgoroutineを数千回実行することはOKです。彼らが行うのは 'time.Sleep()'で実行されるので、彼らの寿命のほとんどはプロセッサ上でスケジュールされません。 – ultimoo

0

あなたはtime.Tickerを使用することができます。

func fooStage(inChan <- chan *Bar) (<- chan *Bar) { 
    //... some code 
    ticker := time.NewTicker(5 * time.Second) 
    <-ticker // the delay, probably need to call twice 
    ticker.Stop() 
    close(ticker.C) 
    //... rest code 
} 
+0

これは後のイベントでどのように動作するのか説明できますか? inChanで2つのイベントが発生した場合、2番目のイベントは10秒間待機しないでしょうか? – ultimoo

+0

'<-ticker'は、指定された期間より頻繁には返されません。やってみなよ –

1

手動ゴルーチンの数を固定してもよい - あなたが必要始まる番号のみを。

func sleepStage(in <-chan *Bar) (out <-chan *Bar) { 
    out = make(<-chan *Bar) 
    wg := sync.WaitGroup 
    for i:=0; i < N; i++ { // Number of goroutines in parallel 
      wg.Add(1) 
      go func(){ 
       defer wg.Done() 
       for e := range in { 
        time.Sleep(5*time.Seconds) 
        out <- e 
       } 
      }() 
     } 
     go func(){} 
      wg.Wait() 
      close(out) 
     }() 
     return out 
    } 
関連する問題