1

私はConcurrentHashMapというクラスを持っており、30秒ごとに1つのスレッドで更新されてから、同じConcurrentHashMapから複数のリーダースレッドを読み込み、getNextSocket()メソッドを呼び出すことができます。1つのスレッドからConcurrentHashMapを読み込み、競合状態なしで複数のスレッドから読み込みますか?

以下は私のシングルトンクラスで、初期化時にConcurrentHashMapを設定するメソッドを呼び出すと、updateSockets()メソッドを呼び出すことによって同じマップを30秒ごとに更新するバックグラウンドスレッドを開始します。

次に、複数のスレッドから、同じマップを使用して情報を取得する次の利用可能なライブソケットを取得するメソッドgetNextSocket()を呼び出しています。私もSocketInfoクラスは不変かどうかは、ライブかどうかにかかわらず、すべてのソケットの状態を含んでいます。

public class SocketHolder { 
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); 
    private final Map<DatacenterEnum, List<SocketInfo>> liveSocketsByDc = new ConcurrentHashMap<>(); 

    // Lazy Loaded Singleton Pattern 
    private static class Holder { 
    private static final SocketHolder INSTANCE = new SocketHolder(); 
    } 

    public static SocketHolder getInstance() { 
    return Holder.INSTANCE; 
    } 

    private SocketHolder() { 
    connectToSockets(); 
    scheduler.scheduleAtFixedRate(new Runnable() { 
     public void run() { 
     updateSockets(); 
     } 
    }, 30, 30, TimeUnit.SECONDS); 
    } 

    private void connectToSockets() { 
    Map<DatacenterEnum, ImmutableList<String>> socketsByDc = TestUtils.SERVERS; 
    for (Map.Entry<DatacenterEnum, ImmutableList<String>> entry : socketsByDc.entrySet()) { 
     List<SocketInfo> addedColoSockets = connect(entry.getKey(), entry.getValue(), ZMQ.PUSH); 
     liveSocketsByDc.put(entry.getKey(), addedColoSockets); 
    } 
    } 

    private List<SocketInfo> connect(DatacenterEnum dc, List<String> addresses, int socketType) { 
    List<SocketInfo> socketList = new ArrayList<>(); 
    // ... some code here 
    return socketList; 
    } 

    // called from multiple reader threads to get next live available socket 
    public Optional<SocketInfo> getNextSocket() { 
    Optional<SocketInfo> liveSocket = getLiveSocket(liveSocketsByDc.get(DatacenterEnum.CORP)); 
    return liveSocket; 
    } 

    private Optional<SocketInfo> getLiveSocket(final List<SocketInfo> listOfEndPoints) { 
    if (!CollectionUtils.isEmpty(listOfEndPoints)) { 
     Collections.shuffle(listOfEndPoints); 
     for (SocketInfo obj : listOfEndPoints) { 
     if (obj.isLive()) { 
      return Optional.of(obj); 
     } 
     } 
    } 
    return Optional.absent(); 
    } 

    // update CHM map every 30 seconds 
    private void updateSockets() { 
    Map<DatacenterEnum, ImmutableList<String>> socketsByDc = TestUtils.SERVERS; 

    for (Entry<DatacenterEnum, ImmutableList<String>> entry : socketsByDc.entrySet()) { 
     List<SocketInfo> liveSockets = liveSocketsByDc.get(entry.getKey()); 
     List<SocketInfo> liveUpdatedSockets = new ArrayList<>(); 
     for (SocketInfo liveSocket : liveSockets) { 
     Socket socket = liveSocket.getSocket(); 
     String endpoint = liveSocket.getEndpoint(); 

     boolean sent = ....; 

     boolean isLive = sent ? true : false; 

     // is this right here? or will it cause any race condition? 
     SocketInfo state = new SocketInfo(socket, liveSocket.getContext(), endpoint, isLive); 
     liveUpdatedSockets.add(state); 
     } 
     // update map with new liveUpdatedSockets 
     liveSocketsByDc.put(entry.getKey(), liveUpdatedSockets); 
    } 
    } 
} 

質問:

私の上記のコードのスレッドが安全であるとupdateSockets()getNextSocket()方法には競合状態が存在しませんか?私updateSockets()方法で

、私はすでにupdateSockets()方法で初期化または30秒の次のインターバルの間にconnectToSockets()方法で前に移入されたliveSocketsByDcのConcurrentHashMapからList<SocketInfo>を抽出し、その後、私は同じリストliveSocketsを反復し、に応じて新しいSocketInfoオブジェクトを作成していますisLiveがtrueまたはfalseであるかどうか。そして、この新しいSocketInfoオブジェクトでliveSocketsByDc ConcurrentHashMapを更新します。これは正しいか?複数のリーダースレッドから、getNextSocket()メソッドを呼び出すことになるので、同じマップを使用して次に使用可能なライブソケットを取得するgetLiveSocketメソッドを呼び出します。

私はliveSocketsのリストを反復しており、isLiveのフィールドを変更するだけで新しいSocketInfoオブジェクトを作成していますが、他のものは同じままです。これは正しいですか?

スレッドの安全性の問題がある場合は、これを解決する最善の方法は何ですか?ここで

+0

いいえ、スレッドセーフではありません。私が見つけた最初の違反(さらに進んでいない)は、すべての読み込みが共有されたArrayListをマップから取得し、それをシャッフルすることです。 –

+0

この質問は、おそらく私たちの姉妹サイト[Code Review](http://codereview.stackexchange.com/)よりもここではうまくいきます。 –

+0

@JBNizet私は私の例でこれをどのように修正できますか? – user1234

答えて

1

List<SocketInfo> liveSockets = liveSocketsByDc.get(entry.getKey()); 

あなたの異なるスレッドが潜在的に並列に同じリストオブジェクトの読み込み/書き込みされています。

So:スレッドセーフではありません。これは、「外側の」スレッドセーフなデータ構造を持つのに役立ちません。そのスレッドセーフなものにはデータが含まれていますが、スレッドセーフではありません。しかし、その後、並行して「作業」しました!

+0

私のコードでこのスレッドの安全性の問題を解決する最良の方法は何ですか? – user1234

関連する問題