2016-11-10 9 views
1

以下のconsumer-2がどのように 'null'を消費しているか説明できますか?私のコードはこれを防ぐべきです。単一のプロデューサ複数のコンシューマ - キューにはNULLが含まれています

public class Test { 

public static void main(String args[]) throws InterruptedException { 

    BoundedQueue<Integer> sharedQueue = new BoundedQueue<>(10); 

    Callable<Integer> producer1 = new Producer(sharedQueue, "producer-1"); 
    //Callable<Integer> producer2 = new Producer(sharedQueue, "producer-2"); 
    Callable<Integer> consumer1 = new Consumer(sharedQueue, "consumer-1"); 
    Callable<Integer> consumer2 = new Consumer(sharedQueue, "consumer-2"); 

    Collection<Callable<Integer>> callables = new HashSet<>(); 
    callables.add(producer1); 
    //callables.add(producer2); 
    callables.add(consumer1); 
    callables.add(consumer2); 

    ExecutorService executorService = Executors.newFixedThreadPool(10); 
    executorService.invokeAll(callables); 
} 
} 

public class BoundedQueue<T> { 

private int capacity; 
private int head; 
private int tail; 
private int currentSizeOfBuffer; 
private T[] buffer; 

private final ReentrantLock lock = new ReentrantLock(); 
private final Condition notFull = lock.newCondition(); 
private final Condition notEmpty = lock.newCondition(); 

public BoundedQueue(int capacity) { 
    this.capacity = capacity; 
    this.buffer = (T[]) new Object[capacity]; 
} 

public void put(T element) throws InterruptedException { 

    final ReentrantLock lock = this.lock; 
    lock.lock(); 

    if(isBufferFull()) { 
     waitOnAvailableSlot(); 
    } 

    try { 
     buffer[tail] = element; 
     tail = getNextAvailableSlot(tail); 
     currentSizeOfBuffer++; 

     informConsumerQueueHasElement(); 

    } finally { 
     lock.unlock(); 
    } 
} 

private boolean isBufferFull() { 
    return capacity == currentSizeOfBuffer; 
} 

private void waitOnAvailableSlot() throws InterruptedException { 
    notFull.await(); 
} 

private void informConsumerQueueHasElement() { 
    notEmpty.signal(); 
} 

public T take() throws InterruptedException { 

    final ReentrantLock lock = this.lock; 
    lock.lock(); 

    if(isBufferEmpty()) { 
     waitOnAvailableElement(); 
    } 

    try { 
     T element = buffer[head]; 
     head = getNextAvailableSlot(head); 
     currentSizeOfBuffer--; 

     informProducerQueueHasSpaceAvailable(); 

     return element; 
    } finally { 
     lock.unlock(); 
    } 
} 

private boolean isBufferEmpty() { 
    return 0 == currentSizeOfBuffer; 
} 

private void waitOnAvailableElement() throws InterruptedException { 
    notEmpty.await(); 
} 

private void informProducerQueueHasSpaceAvailable() { 
    notFull.signal(); 
} 

private final int getNextAvailableSlot(int currentSlotPosition) { 
    int nextAvailableSlot = ++currentSlotPosition; 
    return (nextAvailableSlot == capacity) ? 0 : nextAvailableSlot; 
} 
} 


public class Producer implements Callable<Integer> { 

private final BoundedQueue sharedQueue; 
private String name; 

@Override 
public Integer call() throws Exception { 

    for(int i=0; i<10; i++){ 
     try { 
      sharedQueue.put(i); 
      System.out.println(name + " produced: " + i); 
     } catch (InterruptedException ex) { 
      Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex); 
     } 
    } 
    return null; 
} 

public Producer(BoundedQueue sharedQueue, String name) { 
    this.sharedQueue = sharedQueue; 
    this.name = name; 
} 
} 

public class Consumer implements Callable<Integer> { 

private final BoundedQueue sharedQueue; 
private String name; 

@Override 
public Integer call() throws Exception { 

    while(true){ //what is happening here? 
     try { 
      Integer element = (Integer) sharedQueue.take(); 
      System.out.println(name + " consumed: "+ element); 
     } catch (InterruptedException ex) { 
      Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex); 
     } 
    } 
} 

public Consumer(BoundedQueue sharedQueue, String name) { 
    this.sharedQueue = sharedQueue; 
    this.name = name; 
} 
} 

出力:

  • プロデューサ-2産:0
  • 消費者-2消費:ヌル
  • 消費者1が消費:0
  • プロデューサ-2産:1
  • producer-2 produced:2
  • consumer-2 consumed:2
  • 消費者1が消費:0
  • プロデューサ-1産:0
  • 消費者-2消費:3
  • など

別ラン:

  • プロデューサ-2産: 0
  • 消費者-1消費:0
  • 消費者2消費:ヌル
  • プロデューサ-1産:0
  • roducer-2産:1
  • プロデューサ-1産:1
  • 消費者-2消費:0
  • 消費者1が消費:ヌル
  • 消費者-2消費:2
  • など

答えて

1

ifの代わりにwhile(isBufferEmpty())を使用する必要があります(フルの場合も同じです)。すべてのコンシューマ(およびプロデューサ)が同時にシグナルを受け取るので、他のキューがキューに追加された要素をまだ処理していないことを確認するために再確認する必要があります。

+0

もちろん、それが指摘されている時は明らかです。 – TheCoder

関連する問題