2017-02-23 70 views
1

処理後にリモートBLOBストアにアップロードするファイルがたくさんあります。再試行で失敗した場合のキュー処理

現在のところ、フロントエンド(PHP)はこのようなファイルのリストを作成し、それにJobIDという一意のIDを付けます。次に、一意のIDをBeプロセスに渡されたBeanStalkチューブに渡します。 Go workersというライブラリを使用して、各ジョブIDをnet/httpのように処理します。ジョブIDを受信し、redisリストを取得し、ファイルの処理を開始します。

ただし、現在は一度に1つのファイルしか処理されません。ここでの操作はCPU境界ではなくI/O境界であるため、直感はファイルごとにゴルーチンを使用することが有益であることを示唆しています。

しかし、失敗した場合にアップロードを再試行し、ジョブごとに処理されるアイテムの数を追跡したいとします。 1つのジョブに約10KBのファイルを格納でき、そのジョブの100秒をピーク時に1秒間に送信できるため、無制限の数のゴルーチンを開始することはできません。これに対して正しいアプローチは何でしょうか?

NB:私たちは、必要な場合は、ゴルーチンの最大数のサイズにより緩衝chanを使用してゴルーチンの数を制限することができ

答えて

2

(そのような何かのためにbeanstalkdをスワップアウトなど)ビットスタック技術を変更することができますあなたは欲しい。最大容量に達すると、このchanをブロックすることができます。ゴルーチンが終了すると、新しいゴルーチンを実行できるようにスロットを解放します。

例:

package main 

import (
    "fmt" 
    "sync" 
) 

var (
    concurrent = 5 
    semaphoreChan = make(chan struct{}, concurrent) 
) 

func doWork(wg *sync.WaitGroup, item int) { 
    // block while full 
    semaphoreChan <- struct{}{} 

    go func() { 
     defer func() { 
      // read to release a slot 
      <-semaphoreChan 
      wg.Done() 
     }() 
     // This is where your work actually gets done 
     fmt.Println(item) 
    }() 
} 

func main() { 
    // we need this for the example so that we can block until all goroutines finish 
    var wg sync.WaitGroup 
    wg.Add(10) 

    // start the work 
    for i := 0; i < 10; i++ { 
     doWork(&wg, i) 
    } 

    // block until all work is done 
    wg.Wait() 
} 

行く遊び場リンク:このGolang英国の会議に触発さhttps://play.golang.org/p/jDMYuCe7HV

は話:https://youtu.be/yeetIgNeIkc?t=1413

+0

それは同時実行を制限するに始めるために私を助けました。しかし、今もなお残っている問題は、仕事の成否を把握する方法です。ジョブにはN個のサブタスクが含まれています。これらのタスクはすべて正常に処理されなければならず、それ以外のエラーを報告する必要があります。私はこれにどのようにアプローチするのですか? – agathver

+0

ゴルーチンに渡すチャネルを作成します。ゴルーチンは、エラーを含む操作の結果をそのチャンネルに書き込むことができます。呼び出し元は、そのチャネルから情報を引き出して、必要に応じてエラーを処理できます(たとえば、エラーを記録するか、操作を再試行します)。操作を再試行する必要がある場合は、再試行するために必要なコンテキスト(たとえば、ゴルーチンが再試行するために必要な入力)とエラーを持つカスタム構造体タイプのチャネルを作成します。 – MahlerFive

関連する問題