2017-12-03 6 views
1

現在、私は検索エンジンプロジェクトに取り組んでいます。より速いクロールスピードのために、私はリンク訪問ごとに1つのゴルーチンを使用します。しかし私は私に不思議に思う2つの問題に遭遇しました!同時実行性:goroutinesの制限が期待通りに機能しない

まず一つはコードサンプルです:

package main 

import "fmt" 
import "sync" 
import "time" 

type test struct { 
    running int 
    max  int 
    mu  sync.Mutex 
} 

func main() { 
    t := &test{max: 1000} 
    t.start() 
} 

func (t *test) start() { 
    for { 
     if t.running >= t.max { 
      time.Sleep(200 * time.Millisecond) 
      continue 
     } 
     go t.visit() 
    } 
} 

func (t *test) visit() { 
    t.inc() 
    defer t.dec() 
    fmt.Println("visit called") 
    fmt.Printf("running: %d, max: %d\n", t.running, t.max) 
    fmt.Println() 
    time.Sleep(time.Second) 
} 

func (t *test) inc() { 
    t.mu.Lock() 
    t.running++ 
    t.mu.Unlock() 
} 
func (t *test) dec() { 
    t.mu.Lock() 
    t.running-- 
    t.mu.Unlock() 
} 

出力(切り取られた):

running: 2485, max: 1000 

running: 2485, max: 1000 

running: 2485, max: 1000 

visit called 
running: 2485, max: 1000 

running: 2485, max: 1000 

running: 2485, max: 1000 

running: 2485, max: 1000 


visit called 
running: 2485, max: 1000 


running: 2485, max: 1000 

私は明示的にループ内での最大許容ゴルーチンをチェックしていますが、なぜゴルーチンを実行すると、最大値を超えて?


2つ目は、実際のプロジェクトのコードの一部です:

UPDATE:、問題が返すことがかかりすぎたLinkProvider.Get()実装ではこれが実際に固定されているました。 parser.visit()はその間に戻りますが、Parser.Start()のループは新しいリンクを待っています...出力は連続しているようです!

package worker 

import (
    "errors" 
    "fmt" 
    "sync" 
    "time" 

    "bitbucket.org/codictive/ise/components/crawler/models" 
    "bitbucket.org/codictive/ise/components/log/logger" 
    "bitbucket.org/codictive/ise/core/component" 
    "bitbucket.org/codictive/ise/core/database" 
) 

// Worker is a service that processes crawlable links. 
type Worker interface { 
    Start() error 
    Stop() error 
    Restart() error 
    Status() Status 
} 

// Status contains runtime status of a worker. 
type Status struct { 
    Running    bool 
    RunningParsersCount int 
} 

// New return a new defaultWorker with given config. 
func New() Worker { 
    return &defaultWorker{ 
     flow: make(chan bool), 
     stop: make(chan bool), 
    } 
} 

// defaultWorker is a Worker implementation. 
type defaultWorker struct { 
    linkProvider   LinkProvider 
    handlersLimit  int 
    runningHandlersCount int 
    running    bool 
    mu     sync.Mutex 
    flow     chan bool 
    stop     chan bool 
} 

func (w *defaultWorker) init() { 
    prate, _ := component.IntConfig("crawler.crawlInterval") 
    arate, _ := component.IntConfig("crawler.ad_crawlInterval") 
    concLimit, _ := component.IntConfig("crawler.concurrent_workers_limit") 
    w.linkProvider = NewLinkProvider(time.Duration(prate)*time.Hour, time.Duration(arate)*time.Hour) 
    w.handlersLimit = concLimit 
} 

// Start runs worker. 
func (w *defaultWorker) Start() error { 
    logger.Info("Starting crawler worker...") 
    w.running = true 
    w.init() 

    defer func() { 
     w.running = false 
     logger.Info("Worker stopped.") 
    }() 

    for { 
     select { 
     case <-w.stop: 
      w.flow <- true 
      return nil 
     default: 
      fmt.Printf("running: %d limit: %d\n", w.runningHandlersCount, w.handlersLimit) 
      if w.runningHandlersCount >= w.handlersLimit { 
       time.Sleep(200 * time.Millisecond) 
       continue 
      } 

      link := w.linkProvider.Get() 
      if link.ID == 0 { 
       logger.Debug("no link to crawl") 
       time.Sleep(time.Minute) 
       continue 
      } 

      go func(l *models.CrawlLink) { 
       go w.visit(l) 
      }(link) 
     } 
    } 
} 

// Stop stops worker. 
func (w *defaultWorker) Stop() error { 
    logger.Info("Stopping crawler worker...") 
    w.stop <- true 
    select { 
    case <-w.flow: 
     return nil 
    case <-time.After(2 * time.Minute): 
     return errors.New("worker did not stopped properly") 
    } 
} 

// Restart re-starts worker. 
func (w *defaultWorker) Restart() error { 
    logger.Info("Re-starting crawler worker...") 
    w.stop <- true 
    select { 
    case <-w.flow: 
     return w.Start() 
    case <-time.After(2 * time.Minute): 
     return errors.New("can not restart worker") 
    } 
} 

// Status reports current worker status. 
func (w *defaultWorker) Status() Status { 
    return Status{ 
     Running:    w.running, 
     RunningParsersCount: w.runningHandlersCount, 
    } 
} 

func (w *defaultWorker) visit(cl *models.CrawlLink) { 
    w.incrementRunningWorkers() 
    defer w.decrementRunningWorkers() 

    if cl == nil { 
     logger.Warning("[crawler.worker.visit] Can not visit a nil link.") 
     return 
    } 
    if err := cl.LoadFull(); err != nil { 
     logger.Error("[crawler.worker.visit] Can not load link relations. (%v)", err) 
     return 
    } 

    parser := NewParser(cl) 
    if parser == nil { 
     logger.Error("[crawler.worker.visit] Parser instantiation failed.") 
     return 
    } 
    before := time.Now() 
    if err := parser.Parse(); err != nil { 
     cl.Error = err.Error() 
     logger.Error("[crawler.worker.visit] Parser finished with error: %v.", err) 
     db := database.Open() 
     if err := db.Save(&cl).Error; err != nil { 
      logger.Error("[crawler.worker.visit] can not update crawl link. (%v)", err) 
     } 
    } 
    logger.Debug("[crawler.worker.visit] Parsing %q took %s.", cl.URL, time.Since(before)) 
    fmt.Printf("[crawler.worker.visit] Parsing %q took %s.\n", cl.URL, time.Since(before)) 
} 

func (w *defaultWorker) incrementRunningWorkers() { 
    w.mu.Lock() 
    w.runningHandlersCount++ 
    w.mu.Unlock() 
    fmt.Printf("increment called. current: %d\n", w.runningHandlersCount) 
} 

func (w *defaultWorker) decrementRunningWorkers() { 
    w.mu.Lock() 
    w.runningHandlersCount-- 
    w.mu.Unlock() 
    fmt.Printf("decrement called. current: %d\n", w.runningHandlersCount) 
} 

は出力:

2017/12/03 11:24:36 profile: cpu profiling enabled, /var/folders/1x/01d32mrs2plcj9pnb3mnnrhw0000gn/T/profile924798503/cpu.pprof 
running: 0 limit: 1000 
Running server on :8080 
running: 0 limit: 1000 
increment called. current: 1 
[crawler.worker.visit] Parsing "https://www.sheypoor.com/%D9%81%D8%B1%D8%A7%D8%B4%D8%A8%D9%86%D8%AF/%D8%A7%D9%85%D9%84%D8%A7%DA%A9/%D9%81%D8%B1%D9%88%D8%B4-%D8%A7%D8%AF%D8%A7%D8%B1%DB%8C-%D9%88-%D8%AA%D8%AC%D8%A7%D8%B1%DB%8C" took 370.140513ms. 
decrement called. current: 0 
running: 0 limit: 1000 
increment called. current: 1 
[crawler.worker.visit] Parsing "https://www.sheypoor.com/%D8%B3%D8%A7%D9%85%D8%B3%D9%88%D9%86%DA%AF-s3-neo-24252682.html" took 193.193357ms. 
decrement called. current: 0 
running: 0 limit: 1000 
increment called. current: 1 
[crawler.worker.visit] Parsing "https://www.sheypoor.com/%D9%85%DB%8C%D8%B2%D9%88%D8%B5%D9%86%D8%AF%D9%84%DB%8C-%D8%AA%D8%A7%D9%84%D8%A7%D8%B1-22399505.html" took 201.636741ms. 
decrement called. current: 0 
running: 0 limit: 1000 
increment called. current: 1 
[crawler.worker.visit] Parsing "https://www.sheypoor.com/50000%D9%85%D8%AA%D8%B1-%D8%B2%D9%85%DB%8C%D9%86-%D9%85%D8%B1%D8%BA%D8%AF%D8%A7%D8%B1%DB%8C-%D9%88%D8%A7%D9%82%D8%B9-%D8%AF%D8%B1-%D8%AE%D8%B1%D9%85%D8%AF%D8%B1%D9%87-23075331.html" took 210.360596ms. 
decrement called. current: 0 
^C2017/12/03 11:24:43 profile: caught interrupt, stopping profiles 
2017/12/03 11:24:43 profile: cpu profiling disabled, /var/folders/1x/01d32mrs2plcj9pnb3mnnrhw0000gn/T/profile924798503/cpu.pprof 

として、あなたはvisit方法が完全にシーケンシャルに実行見ることができます!私はちょうどgo visit(link)または上記のコードで使用されているものでそれを呼び出すかどうか。 これはなぜ発生するのですか?何が反復からループを停止していますか?

+0

私はこの問題を発見しました。 'linkProvider.Get'は時間がかかり、新しいリンクが取得される前に訪問が行われます。 – ma3x

答えて

2

私はチャネルを使用して機能をブロックし、この問題を解決するだろう - https://play.golang.org/p/KbYOI1oGNs

主な変更点は、我々は、チャネルguardを持っているということです、ゴルーチンが開始されたとき、私たちはそこに新しい項目を入れて(とサイズが上限に達した場合、それはブロックされます)終了したら解放します。

func (t *test) start() { 
    maxGoroutines := t.max 
    guard := make(chan struct{}, maxGoroutines) 

    for { 
     guard <- struct{}{} 
     go func() { 
      t.visit() 
      <-guard 
     }() 
    } 
} 
+0

ありがとう! 2番目の問題についてアドバイスしますか? – ma3x

関連する問題