0

私はCassandraには比較的新しいので、Executor Poolを介して実行されるプリペアドステートメントを使用してデータを取得しようとしています。私が受け取っているデータが一貫していないように見えます。Cassandraがスレッドプールを介してデータを取得する

私はこのuser_connectionsテーブルを持っています。ここで、user_idは行キー、friends_idリストはセットカラムです。 私はこの別のテーブル、friends_infoテーブルを持っています。ここでfriend_idは行キーで、他のすべての情報はカラムです。

ユーザーAAのフレンドリストを取得しようとしているときに、フレンドリストBBB、CCC、DDDを取得しています。どちらが完璧です。

プリペアドステートメントを使用して、実行プログラムプール経由でBBB、CCC、DDDを取得しようとしています。データに矛盾があります。すべての3つのレコードがBBBで、時には3つのレコードがすべてBBBで、1つがCCCなどです。

私が使用しているメソッドと関連するクラスを提供しました。これで私。準備されたステートメントはスレッドセーフであり、期待どおりに動作することが期待されています。

public Set<User> listUserConnections(String userId) { 
    Session session = client.getSession(); 

    Set<String> listUserConnectionIds = listUserConnections(userId, session); 

    if (listUserConnectionIds.size() == 0) 
     return new HashSet<User>(); 



    Set<User> listConnectionUserDetails = retrieveUserConnectionProfileInfo(
      listUserConnectionIds, session); 
    return listConnectionUserDetails; 

} 



private Set<User> retrieveUserConnectionProfileInfo(Set<String> listUserConnectionIds, 
     Session session) { 


    Set<Callable<User>> callables = new HashSet<Callable<User>>(); 


    for(String key:listUserConnectionIds){ 



     logger.info("about to add callable" + key); 

     Callable<User> callable = new QueryTask(key); 

     callables.add(callable); 

    } 
    // invoke in parallel 
    List<Future<User>> futures; 
    Set<User> users = new HashSet<User>(); 

    // TODO Revisit this 
    ExecutorService executorPool = Executors.newFixedThreadPool(100); 

    try { 
     futures = executorPool.invokeAll(callables); 
     for (Future<User> f : futures) { 

      User user = f.get(); 
      users.add(user); 

      logger.info("User from future"+user.getUserId()); 

     } 
    } catch (ExecutionException e) { 
     logger.error("Error in retrieving the stores in parallel ", e); 
    } catch (InterruptedException e) { 
     logger.error("Error in retrieving the stores in parallel as it was interrupted ", e); 
    } finally { 
     executorPool.shutdown(); 
    } 

    return users; 
} 

// Executorのプールクラス

class QueryTask implements Callable<User> { 

    private String userName; 

    // final PreparedStatement statement = 
    // client.getSession().prepare(QueryConstants.GET_ALL_STORE_BRANDS); 

    QueryTask(String name) { 
     this.userName = name; 

    } 

    @Override 
    public User call() throws Exception { 
     // -------------I am seeing the userName is correct------------- for example BBB 
     logger.info("inside call processing queries for " + userName); 

     //------------This is a prepared statement, userPreparedStatement.getbStUserInfo() 
     BoundStatement bStUserInfo = userPreparedStatement.getbStUserInfo(); 
     bStUserInfo.bind(userName); 
     Session session = client.getSession(); 
     ResultSet rs = session.execute(bStUserInfo); 

     User user = new User(); 

     Iterator<Row> rowIterator = rs.iterator(); 
     while (rowIterator.hasNext()) 
     { 
     Row row = rowIterator.next(); 

     //-------This user id is not right 
     logger.info("Inside the callable after retrieval"+row.getString(TableConstants.Users.COLUMN_NAME_USER_ID)); 
     user.setUserId(row.getString(TableConstants.Users.COLUMN_NAME_USER_ID)); 



     return user; 
     } 

     logger.info("outside the while loop"); 
     return user; 

    } 

} 
+0

あなたの 'BoundStatement'インスタンスがどこに' new'しているのかわかりません。 – Ralf

+2

補足として、ドライバはすでに非同期APIを提供しているので、ほとんどすべてのQueryTask/Executorコードを取り除き、テストに集中することができます。 –

答えて

0

この上で私に戻って取得するためにそんなに@Ralfとアレックス・ポペスクありがとうございます。 Datastaxには、Aync呼び出しがどのように働くかについて深く掘り下げたドキュメンテーションがありました。

@ Alex Popescuありがとう、私は彼らのaync呼び出しを試み、それは私のために正常に働いた。

関連する問題