2016-07-01 7 views
0

私は消費者がアイテムをバッチで処理するプロデューサ/コンシューマシミュレーションを作成しようとしています。問題はStore.get()関数は、すぐにそれが呼ばれるようStoreから項目を削除することですが、私はそれは私が呼ばれてきたまでは収量に待機する必要があります。次の出力を生成しSimpy Storeバッチ処理

import simpy 

def producer(env, Q): 
    item = 0 
    while True: 
     yield Q.put(item) 
     print('Submit item:%d'%item) 
     item += 1 

def consumer(env, Q): 
    while True: 
     yield env.timeout(20) 
     events = [Q.get() for i in range(4)] 
     items = yield env.all_of(events) 
     print([items for items in items.values()]) 


env = simpy.Environment() 
maxQD = 2 
Q = simpy.Store(env, capacity=maxQD) 

env.process(producer(env, Q)) 
env.process(consumer(env, Q)) 
env.run(until=500) 

を:

それが成功した4つの項目、および2以上のものを追加することができませんでしプロデューサーを取得するまで、消費者のブロックで

Submit item:0 
Submit item:1 

:maxQDで

Submit item:0 
Submit item:1 
Submit item:2 
Submit item:3 
Submit item:4 
[0, 1, 2, 3] 
Submit item:5 
Submit item:6 
Submit item:7 
Submit item:8 
[4, 5, 6, 7] 
Submit item:9 
Submit item:10 
Submit item:11 
Submit item:12 
[8, 9, 10, 11] 
... 

が2に設定され、私はちょうどを期待しているだろう

import simpy 

def producer(env, Q): 
    item = 0 
    while True: 
     yield Q.put(item) 
     print('Submit item:%d'%item) 
     item += 1 

def consumer(env, Q): 
    while True: 
     yield env.timeout(20) 
     if len(Q.items) >= 4: 
      events = [Q.get() for i in range(4)] 
      items = yield env.all_of(events) 
      print([items for items in items.values()]) 


env = simpy.Environment() 
maxQD = 4 
Q = simpy.Store(env, capacity=maxQD) 

env.process(producer(env, Q)) 
env.process(consumer(env, Q)) 
env.run(until=500) 

しかし、あなたはまだGETは()それは5のように見えるどのもたらす前に項目を削除することをイライラ行動を得る:

あなたは一種のLEN(Q.items)をチェックすることにより、この問題を解決することができます

class BatchGet(simpy.resources.base.Get): 
    def __init__(self, resource, count): 
     self.count = count 
     super().__init__(resource) 

class BatchStore(simpy.resources.store.Store): 
    get = simpy.core.BoundClass(BatchGet) 

    def _do_put(self, event): 
     if len(self.items) + len(event.item) <= self._capacity: 
      self.items.extend(event.item) 
      event.succeed() 

    def _do_get(self, event): 
     count = event.count 
     if len(self.items) >= count: 
      ret = self.items[:count] 
      self.items = self.items[count:] 
      event.succeed(ret) 
:サブクラス化ストアが解決しよう

Submit item:0 
Submit item:1 
Submit item:2 
Submit item:3 
Submit item:4 
[0, 1, 2, 3] 
+0

あなたが観察したのは、意図された動作です。 env.all_of()はすべてのgetイベント*が成功するまで待つのではなく、すべて成功するまで待つ。 env.all_of()に渡す個々のイベントは独立しており、できるだけ早くトリガーされます。したがって、この例のセマンティクスは「ストア内に4つのアイテムがあるまで待つ」ではなく、「最終的にストアから4アイテムが出るまで待つ」という意味です。 –

答えて

1

:アイテムがQに追加された(maxQDが4に変更に注意してください) 210 Putsはリストを取る必要があり(アイテムのバッチなので)、リストを返します。