ArrayBlockingQueueに機能を追加しようとしています。具体的には、キューが既にキューに含まれている場合にエントリをエンキューしないというような、ユニークな要素を保持するだけです。 私が望む機能はJCIPの4.4項のVectorの拡張と同じなので、そこでのアプローチを使って実装しようとしました。 ArrayBlockingQueueを継承したクラスとして、私はそれへの参照を取得できませんでしたので、パッケージプライベートReentrantLockのを使用して、相互排他を実装しているため拡張することでArrayBlockingQueueに機能を追加する
- 実装は、動作しません。たとえそれが機能したとしても、これは脆弱なアプローチになります。
- クライアント側のロックをサポートしていないため、クライアント側のロックによる実装は機能しません。 ArrayBlockingQueueを構成するとき
- は、組成物による
実装は、このような
public class DistinctBlockingQueue<E> implements BlockingQueue<E> { private final BlockingQueue<E> backingQueue; public DistinctBlockingQueue(BlockingQueue<E> backingQueue) { this.backingQueue = backingQueue; } @Override public synchronized boolean offer(E e) { if (backingQueue.contains(e)) { return false; } return backingQueue.offer(e); } @Override public synchronized E take() throws InterruptedException { return backingQueue.take(); } // Other methods... }
としてコードを生成する、最初に移動するための方法のように思えた残念ながら、このアプローチは、以下のような単純なシナリオでは、デッドロックが得られます。
- スレッドAはtake()を呼び出し、同期ロックとArrayBlockingQueueの内部ロックを取得します。
- スレッドAは、キューが空であり、ArrayBlockingQueueの内部ロックを解放していることを見てブロックします。
- スレッドBは、要素を持つoffer()を呼び出しますが、同期ロックを取得することはできず、永久にブロックします。
私の質問は、どのようにそれがArrayBlockingQueueを書き換えることなく、この機能を実現することが可能となりますか? 同期ための必要はありませんことを
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class DistinctBlockingQueue<E> implements BlockingQueue<E> {
private final BlockingQueue<E> backingQueue;
private final ConcurrentMap<E, Boolean> elements = new ConcurrentHashMap<>();
public DistinctBlockingQueue(BlockingQueue<E> backingQueue) {
this.backingQueue = backingQueue;
}
@Override
public boolean offer(E e) {
boolean[] add = {false};
elements.computeIfAbsent(e, k -> add[0] = true);
return add[0] && backingQueue.offer(e);
}
@Override
public E take() throws InterruptedException {
E e = backingQueue.take();
elements.remove(e);
return e;
}
// Other methods
}
注:
あなたの答えをありがとうが、あなたの解決策は良くありません。この問題は、2つのスレッドが同じアイテムを提供しようとしたときに発生します。最初のスレッドがreturn文に達してコンテキストが切り替わったとすると、2番目のスレッドが実行を開始し、return文に達するとeが提供され、最初のスレッドが再スケジュールされるとeがもう一度提供されます。不足している小切手とオファーの間でマップとキューが変更されないようにする必要があります。そのため、ロックを使用する必要があります。または、より賢明な同期手段を使用する必要があります。 –
@DLevantこれは決して起こりません。 'computeIfAbsent'はトリックを行います...更新を参照してください。 – FaNaJ
最初のスレッドでadd [0]がtrueに設定された直後に別のスレッドがスケジュールされないようにする方法はありません。その場合、同じ要素を2回追加できます。並行性の問題に関しては、これらのテストはほとんど証明できません。 notify()のようなものを追加してみてください。待機(1000);あなたのコードでif(add [0])の前に、私はあなたが私が何を話しているのを見なければならないと信じています。 –