私はConcurrentHashMap
というクラスを持っており、30秒ごとに1つのスレッドで更新されてから、同じConcurrentHashMap
から複数のリーダースレッドを読み込み、getNextSocket()
メソッドを呼び出すことができます。1つのスレッドからConcurrentHashMapを読み込み、競合状態なしで複数のスレッドから読み込みますか?
以下は私のシングルトンクラスで、初期化時にConcurrentHashMap
を設定するメソッドを呼び出すと、updateSockets()
メソッドを呼び出すことによって同じマップを30秒ごとに更新するバックグラウンドスレッドを開始します。
次に、複数のスレッドから、同じマップを使用して情報を取得する次の利用可能なライブソケットを取得するメソッドgetNextSocket()
を呼び出しています。私もSocketInfo
クラスは不変かどうかは、ライブかどうかにかかわらず、すべてのソケットの状態を含んでいます。
public class SocketHolder {
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Map<DatacenterEnum, List<SocketInfo>> liveSocketsByDc = new ConcurrentHashMap<>();
// Lazy Loaded Singleton Pattern
private static class Holder {
private static final SocketHolder INSTANCE = new SocketHolder();
}
public static SocketHolder getInstance() {
return Holder.INSTANCE;
}
private SocketHolder() {
connectToSockets();
scheduler.scheduleAtFixedRate(new Runnable() {
public void run() {
updateSockets();
}
}, 30, 30, TimeUnit.SECONDS);
}
private void connectToSockets() {
Map<DatacenterEnum, ImmutableList<String>> socketsByDc = TestUtils.SERVERS;
for (Map.Entry<DatacenterEnum, ImmutableList<String>> entry : socketsByDc.entrySet()) {
List<SocketInfo> addedColoSockets = connect(entry.getKey(), entry.getValue(), ZMQ.PUSH);
liveSocketsByDc.put(entry.getKey(), addedColoSockets);
}
}
private List<SocketInfo> connect(DatacenterEnum dc, List<String> addresses, int socketType) {
List<SocketInfo> socketList = new ArrayList<>();
// ... some code here
return socketList;
}
// called from multiple reader threads to get next live available socket
public Optional<SocketInfo> getNextSocket() {
Optional<SocketInfo> liveSocket = getLiveSocket(liveSocketsByDc.get(DatacenterEnum.CORP));
return liveSocket;
}
private Optional<SocketInfo> getLiveSocket(final List<SocketInfo> listOfEndPoints) {
if (!CollectionUtils.isEmpty(listOfEndPoints)) {
Collections.shuffle(listOfEndPoints);
for (SocketInfo obj : listOfEndPoints) {
if (obj.isLive()) {
return Optional.of(obj);
}
}
}
return Optional.absent();
}
// update CHM map every 30 seconds
private void updateSockets() {
Map<DatacenterEnum, ImmutableList<String>> socketsByDc = TestUtils.SERVERS;
for (Entry<DatacenterEnum, ImmutableList<String>> entry : socketsByDc.entrySet()) {
List<SocketInfo> liveSockets = liveSocketsByDc.get(entry.getKey());
List<SocketInfo> liveUpdatedSockets = new ArrayList<>();
for (SocketInfo liveSocket : liveSockets) {
Socket socket = liveSocket.getSocket();
String endpoint = liveSocket.getEndpoint();
boolean sent = ....;
boolean isLive = sent ? true : false;
// is this right here? or will it cause any race condition?
SocketInfo state = new SocketInfo(socket, liveSocket.getContext(), endpoint, isLive);
liveUpdatedSockets.add(state);
}
// update map with new liveUpdatedSockets
liveSocketsByDc.put(entry.getKey(), liveUpdatedSockets);
}
}
}
質問:
私の上記のコードのスレッドが安全であるとupdateSockets()
とgetNextSocket()
方法には競合状態が存在しませんか?私updateSockets()
方法で
、私はすでにupdateSockets()
方法で初期化または30秒の次のインターバルの間にconnectToSockets()
方法で前に移入されたliveSocketsByDc
のConcurrentHashMapからList<SocketInfo>
を抽出し、その後、私は同じリストliveSockets
を反復し、に応じて新しいSocketInfo
オブジェクトを作成していますisLive
がtrueまたはfalseであるかどうか。そして、この新しいSocketInfo
オブジェクトでliveSocketsByDc
ConcurrentHashMapを更新します。これは正しいか?複数のリーダースレッドから、getNextSocket()
メソッドを呼び出すことになるので、同じマップを使用して次に使用可能なライブソケットを取得するgetLiveSocket
メソッドを呼び出します。
私はliveSockets
のリストを反復しており、isLive
のフィールドを変更するだけで新しいSocketInfo
オブジェクトを作成していますが、他のものは同じままです。これは正しいですか?
スレッドの安全性の問題がある場合は、これを解決する最善の方法は何ですか?ここで
いいえ、スレッドセーフではありません。私が見つけた最初の違反(さらに進んでいない)は、すべての読み込みが共有されたArrayListをマップから取得し、それをシャッフルすることです。 –
この質問は、おそらく私たちの姉妹サイト[Code Review](http://codereview.stackexchange.com/)よりもここではうまくいきます。 –
@JBNizet私は私の例でこれをどのように修正できますか? – user1234