2016-05-23 5 views
2

ArrayBlockingQueueに機能を追加しようとしています。具体的には、キューが既にキューに含まれている場合にエントリをエンキューしないというような、ユニークな要素を保持するだけです。 私が望む機能はJCIPの4.4項のVectorの拡張と同じなので、そこでのアプローチを使って実装しようとしました。 ArrayBlockingQueueを継承したクラスとして、私はそれへの参照を取得できませんでしたので、パッケージプライベートReentrantLockのを使用して、相互排他を実装しているため拡張することでArrayBlockingQueueに機能を追加する

  • 実装は、動作しません。たとえそれが機能したとしても、これは脆弱なアプローチになります。
  • クライアント側のロックをサポートしていないため、クライアント側のロックによる実装は機能しません。 ArrayBlockingQueueを構成するとき
  • は、組成物による

    実装は、このような

    public class DistinctBlockingQueue<E> implements BlockingQueue<E> { 
        private final BlockingQueue<E> backingQueue; 
    
        public DistinctBlockingQueue(BlockingQueue<E> backingQueue) { 
         this.backingQueue = backingQueue; 
        } 
    
        @Override 
        public synchronized boolean offer(E e) { 
         if (backingQueue.contains(e)) { 
          return false; 
         } 
    
         return backingQueue.offer(e); 
        } 
    
        @Override 
        public synchronized E take() throws InterruptedException { 
         return backingQueue.take(); 
        } 
    
        // Other methods... 
    } 
    

    としてコードを生成する、最初に移動するための方法のように思えた残念ながら、このアプローチは、以下のような単純なシナリオでは、デッドロックが得られます。

    1. スレッドAはtake()を呼び出し、同期ロックとArrayBlockingQueueの内部ロックを取得します。
    2. スレッドAは、キューが空であり、ArrayBlockingQueueの内部ロックを解放していることを見てブロックします。
    3. スレッドBは、要素を持つoffer()を呼び出しますが、同期ロックを取得することはできず、永久にブロックします。

私の質問は、どのようにそれがArrayBlockingQueueを書き換えることなく、この機能を実現することが可能となりますか? 同期ための必要はありませんことを

import java.util.concurrent.BlockingQueue; 
import java.util.concurrent.ConcurrentHashMap; 
import java.util.concurrent.ConcurrentMap; 

public class DistinctBlockingQueue<E> implements BlockingQueue<E> { 

    private final BlockingQueue<E> backingQueue; 
    private final ConcurrentMap<E, Boolean> elements = new ConcurrentHashMap<>(); 

    public DistinctBlockingQueue(BlockingQueue<E> backingQueue) { 
     this.backingQueue = backingQueue; 
    } 

    @Override 
    public boolean offer(E e) { 
     boolean[] add = {false}; 
     elements.computeIfAbsent(e, k -> add[0] = true); 
     return add[0] && backingQueue.offer(e); 
    } 

    @Override 
    public E take() throws InterruptedException { 
     E e = backingQueue.take(); 
     elements.remove(e); 
     return e; 
    } 

    // Other methods 

} 

注:

答えて

3

おそらく1つの、シンプルで高速なソリューションをjava.util.concurrent.ConcurrentMapを使用することです。

EDITは:

java.util.concurrent.ConcurrentHashMapのドキュメントは言う:

import java.util.concurrent.BlockingQueue; 
import java.util.concurrent.ConcurrentHashMap; 
import java.util.concurrent.ConcurrentMap; 

public class DistinctBlockingQueue<E> implements BlockingQueue<E> { 

    private final BlockingQueue<E> backingQueue; 
    private final ConcurrentMap<E, Boolean> elements = new ConcurrentHashMap<>(); 

    public DistinctBlockingQueue(BlockingQueue<E> backingQueue) { 
     this.backingQueue = backingQueue; 
    } 

    @Override 
    public boolean offer(E e) { 
     boolean[] add = {false}; 
     elements.computeIfAbsent(e, k -> add[0] = true); 
     if (add[0]) { 
      // make sure that the element was added to the queue, 
      // otherwise we must remove it from the map 
      if (backingQueue.offer(e)) { 
       return true; 
      } 
      elements.remove(e); 
     } 
     return false; 
    } 

    @Override 
    public E take() throws InterruptedException { 
     E e = backingQueue.take(); 
     elements.remove(e); 
     return e; 
    } 

    @Override 
    public String toString() { 
     return backingQueue.toString(); 
    } 

    // Other methods 

} 

と...のは、いくつかの同時実行テストを行いましょう:

/** 
* If the specified key is not already associated with a value, 
* attempts to compute its value using the given mapping function 
* and enters it into this map unless {@code null}. The entire 
* method invocation is performed atomically, so the function is 
* applied at most once per key. Some attempted update operations 
* on this map by other threads may be blocked while computation 
* is in progress, so the computation should be short and simple, 
* and must not attempt to update any other mappings of this map. 
* 
* @param key key with which the specified value is to be associated 
* @param mappingFunction the function to compute a value 
* @return the current (existing or computed) value associated with 
*   the specified key, or null if the computed value is null 
* @throws NullPointerException if the specified key or mappingFunction 
*   is null 
* @throws IllegalStateException if the computation detectably 
*   attempts a recursive update to this map that would 
*   otherwise never complete 
* @throws RuntimeException or Error if the mappingFunction does so, 
*   in which case the mapping is left unestablished 
*/ 
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) { 
    ... 
} 

は、私はいくつかの追加のチェックを追加しました:

BlockingQueue<String> queue = new DistinctBlockingQueue<>(new ArrayBlockingQueue<>(100)); 

int n = 1000; 
ExecutorService producerService = Executors.newFixedThreadPool(n); 

Callable<Void> producer =() -> { 
    queue.offer("a"); 
    return null; 
}; 

producerService.invokeAll(IntStream.range(0, n).mapToObj(i -> producer).collect(Collectors.toList())); 
producerService.shutdown(); 

System.out.println(queue); // prints [a] 
+0

あなたの答えをありがとうが、あなたの解決策は良くありません。この問題は、2つのスレッドが同じアイテムを提供しようとしたときに発生します。最初のスレッドがreturn文に達してコンテキストが切り替わったとすると、2番目のスレッドが実行を開始し、return文に達するとeが提供され、最初のスレッドが再スケジュールされるとeがもう一度提供されます。不足している小切手とオファーの間でマップとキューが変更されないようにする必要があります。そのため、ロックを使用する必要があります。または、より賢明な同期手段を使用する必要があります。 –

+1

@DLevantこれは決して起こりません。 'computeIfAbsent'はトリックを行います...更新を参照してください。 – FaNaJ

+0

最初のスレッドでadd [0]がtrueに設定された直後に別のスレッドがスケジュールされないようにする方法はありません。その場合、同じ要素を2回追加できます。並行性の問題に関しては、これらのテストはほとんど証明できません。 notify()のようなものを追加してみてください。待機(1000);あなたのコードでif(add [0])の前に、私はあなたが私が何を話しているのを見なければならないと信じています。 –

1

私の質問に対する部分的な答えが見つかりました。オファー操作は私が望むようにアトミックではありませんが、キューは区別されます。

public class DistinctBlockingQueue<E> implements BlockingQueue<E> { 
    private final BlockingQueue<E> backingQueue; 
    private final Set<E> entriesSet = ConcurrentHashMap.newKeySet(); 

    public DistinctBlockingQueue(BlockingQueue<E> backingQueue) { 
     this.backingQueue = backingQueue; 
     entriesSet.addAll(backingQueue); 
    } 

    @Override 
    public boolean offer(E e) { 
     if (!entriesSet.add(e)) 
      return false; 

     boolean added = backingQueue.offer(e); 
     if (!added) { 
      entriesSet.remove(e); 
     } 

     return added; 
    } 

    @Override 
    public E take() throws InterruptedException { 
     E e = backingQueue.take(); 
     entriesSet.remove(e); 

     return e; 
    } 

    // Other methods... 
} 

妥当なパフォーマンスを得るために、私はとにかく使用したいので、追加のセットは問題ではありません。

しかし、この実装では、バウンドキュー実装(ArrayBlockingQueueなど)と組み合わせて使用​​すると、セットがバインドされないため、セットが非常に大きくなる可能性がありますブロックされた提供。

この解決策は、明らかに原子的でなければならない操作を分割するので、私が見落としている他の問題があるはずと思われます。