2016-09-15 6 views
0

単純なIDの代わりにIDのリストを受け入れるいくつかの「バッチ」ルートがあるため、プロジェクトに私のサーバーへの呼び出しをグループ化する必要がありました。リストへの並行アクセス

私のアイデアは、しばらくの間空になるプールで何かを構築することでした。 (それがうまく名付けられていますかどうかわからない)

は、だから私はこのAbstractRepository作成:

public abstract class AbstractRepository<T>{ 

    protected Context c; 

    //interval between 2 queue emptying 
    private final int POOL_DELAY = 200; 
    protected int downloadingTaskCount = 0; 

    final protected ArrayMap<String, T> memCache = new ArrayMap<>(); 
    final protected HashSet<String> queue = new HashSet<>(); 

    final protected ArrayMap<String, List<FetchedInterface<T>>> callbackMap = new ArrayMap<>(); 

    final protected List<PoolEmptiedInterface> emptinessWatchers = new ArrayList<>(); 


    protected AbstractRepository(Context c) { 
     handler.postDelayed(downloadRoutine, POOL_DELAY); 
     this.c = c; 
    } 

    public void cache(String id) { 
     if (!memCache.containsKey(memCache)) { 
      synchronized (queue) { 
       queue.add(id); 
      } 
     } 
    } 

    public void getCache(String id, FetchedInterface<T> callback) { 

     if (memCache.containsKey(id)) { 
      callback.fetched(memCache.get(id)); 
     } else { 

      synchronized (callbackMap) { 
       if (!callbackMap.containsKey(id)) { 
        callbackMap.put(id, new ArrayList<FetchedInterface<T>>()); 
       } 
       callbackMap.get(id).add(callback); 
      } 

      synchronized (queue) { 
       queue.add(id); 
      } 

     } 
    } 

    public void getCacheIdObj(List<IdObject> idsObj, final ListFetchedInterface<T> callback) { 
     ArrayList<String> ids = new ArrayList<>(); 
     for (IdObject idObj : idsObj) { 
      ids.add(idObj.getId()); 
     } 
     getCache(ids, callback); 
    } 

    public void getCache(List<String> ids, final ListFetchedInterface<T> callback) { 
     final CountDownLatch countDownLatch = new CountDownLatch(ids.size()); 
     final ArrayList<T> array = new ArrayList<>(); 
     for (String id : ids) { 
      getCache(id, new FetchedInterface<T>() { 
       @Override 
       public void fetched(T item) { 
        array.add(item); 
        countDownLatch.countDown(); 
       } 
      }); 

     } 

     new Thread(new Runnable() { 
      @Override 
      public void run() { 
       try { 
        countDownLatch.await(); 
        callback.fetched(array); 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
      } 
     }).start(); 

    } 


    /** 
    * Exists for threads that want to be notified that the user queue has been flushed. 
    */ 
    public void getNotifiedWhenQueueIsEmptied(PoolEmptiedInterface<T> callback) { 
     if (downloadingTaskCount == 0 && queue.isEmpty()) { 
      callback.poolEmpty(); 
     } else { 
      synchronized (emptinessWatchers) { 
       emptinessWatchers.add(callback); 
      } 
     } 
    } 

    protected void doIt(
      final HashSet<String> processingQueue) { 
    } 

    /** 
    * Pool Loop 
    */ 
    Handler handler = new Handler(); 

    private Runnable downloadRoutine = new Runnable() { 
     @Override 
     public void run() { 

      if (!queue.isEmpty()) { 
       final HashSet<String> processingQueue = new HashSet<>(); 

       synchronized (queue) { 
        processingQueue.addAll(queue); 
        queue.clear(); 
       } 
       downloadingTaskCount++; 
       doIt(processingQueue); 
      } 

      handler.postDelayed(downloadRoutine, POOL_DELAY); 
     } 
    }; 

とその子の1 UserRepository

public class UserRepository extends AbstractRepository<UserCache> { 

    private static volatile UserRepository instance; 

    public static UserRepository getInstance(Context c) { 
     synchronized (UserRepository.class) { 
      if (instance == null) { 
       instance = new UserRepository(c); 
      } 
      return instance; 
     } 
    } 

    private UserRepository(Context c) { 
     super(c); 
    } 

    @Override 
    protected void doIt(final HashSet<String> processingQueue) { 
     Api.getInstance().backend.getUsersCache(new IdListArguments(new ArrayList<>(processingQueue))) 
       .enqueue(new Callback<Map<String, UserCache>>() { 
        @Override 
        public void onResponse(Call<Map<String, UserCache>> call, Response<Map<String, UserCache>> responseParent) { 
         Map<String, UserCache> response = responseParent.body(); 
         Iterator<String> it = processingQueue.iterator(); 
         while (it.hasNext()) { 

          String id = it.next(); 
          if (response.containsKey(id)) { 
           memCache.put(id, response.get(id)); 
           if (callbackMap.containsKey(id)) { 
            for (FetchedInterface callback : callbackMap.get(id)) { 
             callback.fetched(response.get(id)); 
            } 
           } 
           it.remove(); 
          } 
         } 

         for (PoolEmptiedInterface watcher : emptinessWatchers) { 
          watcher.poolEmpty(); 
         } 
         downloadingTaskCount--; 
         queue.addAll(processingQueue); 
        } 

        @Override 
        public void onFailure(Call<Map<String, UserCache>> call, Throwable t) { 
         queue.addAll(processingQueue); 
        } 
       }); 
    } 
} 

マイ例外:

Java.util.ConcurrentModificationException 
    at java.util.ArrayList$ArrayListIterator.next(ArrayList.java:573) 
    at com.m360.android.domain_layer.interactors.MemberInteractor.constructPagerMemberUsers(MemberInteractor.java:116) 
    at com.m360.android.domain_layer.interactors.MemberInteractor.access$000(MemberInteractor.java:29) 
    at com.m360.android.domain_layer.interactors.MemberInteractor$2$1.fetched(MemberInteractor.java:64) 
    at com.m360.android.datalayer.repositories.AbstractRepository$2.run(AbstractRepository.java:99) 

とMemberInteractor静的メソッドのみを含んでいます。クラッシュは、フォールを使用して表示されます理由2:

public static void getGroupUsers(String id, final Context c, final ErrorDisplayerInterface i, final MemberInteractorCallback callback) { 
    GroupRepository.getInstance(c).getCache(id, new AbstractRepository.FetchedInterface<Group>() { 
     @Override 
     public void fetched(Group item) { 
      UserRepository userRepositoryNew = UserRepository.getInstance(c); 
      userRepositoryNew.getCache(new ArrayList<>(item.getUsers()), new AbstractRepository.ListFetchedInterface<UserCache>() { 
       @Override 
       public void fetched(List<UserCache> items) { 
        callback.onFinish(constructPagerMemberUsers(items)); 
       } 
      }); 
     } 
    }); 
} 


private static List<PagerMemberUser> constructPagerMemberUsers(final List<UserCache> items) { 

    final List<PagerMemberUser> users = new ArrayList<>(); 
    for (UserCache item : items) { 
     users.add(new PagerMemberUser(item)); 
    } 
    return users; 
} 

私は申し訳ありませんが、コードはたくさんありますが、私はすべてが私の質問に関係していると思います。

そこで何が起こっていますか?私は問題を見ない。

答えて

2

ループごとに繰り返すリストを変更するためのConcurrentModificationExceptionが表示されます。

とても簡単修正プログラムは、あなたがそれを反復処理する前に、リストのコピーを作成するためのforeachを変更するには、この場合には、次のようになります。

for (UserCache item : new LinkedList<UserCache>(items)) { 
    users.add(new PagerMemberUser(item)); 
} 
+0

ヒュムあなたの修正が実際に動作しますが、私は同期に好まれていると思います変更メソッド。私はそれが何であるかを見ていない。 –

関連する問題