私は、任意に選んだ10の容量のLinkedBlockingQueueと1000行の入力ファイルを持っています。私が知っているサービスクラスのmain
メソッドに1つのExecutorService
型変数があります。Executors.newSingleThreadExecutor()
を使用しています - ファイルline == null
まで呼び出して、単一のスレッドであるbuffer.readline()を呼び出してから、ループして --tenスレッドを使用して、行を処理し、出力ファイルに書き込みます。!queue.take().equals("Stop")
までです。しかし、ファイルにいくつかの行を書き込んだ後、私がデバッグモードに入っていると、キューの容量は最終的にmax(10)に達し、処理スレッドはqueue.take()
を実行しません。すべてのスレッドはrunning
状態ですが、プロセスはqueue.put()
の後に停止します。何がこの問題を引き起こし、単一の変数ではなく、スレッドプールまたは複数のハンドラ変数の組み合わせを使用して解決できますか?1人のプロデューサExecutors.newSingleThreadExecutor()で10人のコンシューマファイル処理
//app settings to get values for keys within a properties file
AppSettings appSettings = new AppSettings();
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
maxProdThreads = 1;
maxConsThreads = 10;
ExecutorService execSvc = null;
for (int i = 0; i < maxProdThreads; i++) {
execSvc = Executors.newSingleThreadExecutor();
execSvc.submit(new ReadJSONMessage(appSettings,queue));
}
for (int i = 0; i < maxConsThreads; i++) {
execSvc = Executors.newSingleThreadExecutor();
execSvc.submit(new ProcessJSONMessage(appSettings,queue));
}
読み込みメソッドコード:
buffer = new BufferedReader(new FileReader(inputFilePath));
while((line = buffer.readLine()) != null){
line = line.trim();
queue.put(line);
}
処理と書き込みコード:処理で
while(!(line=queue.take()).equals("Stop")){
if(line.length() > 10)
{
try {
if(processMessage(line, outputFilePath) == true)
{
++count;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public boolean processMessage(String line, String outputFilePath){
CustomObject cO = new CustomObject();
cO.setText(line);
writeToFile1(cO,...);
writeToFile2(cO,...);
}
public void writeOutputAToFile(CustomObject cO,...){
synchronized(cO){
...
org.apache.commons.io.FileUtils.writeStringToFile(...)
}
}
public void writeOutputBToFile(CustomObject cO,...){
synchronized(cO){
...
org.apache.commons.io.FileUtils.writeStringToFile(...)
}
}
あなたの読み書きコードを投稿してください。私はあなたが何とかそこに邪魔していると感じている。 –
あなたの答えではありませんが、私はあなたのループの外に単一のExecutors.newCachedThreadPool()を置いてそれを使用することをお勧めします。終了すると、1回の呼び出しですべてのスレッドをシャットダウンすることができます( 'exeSvc.shutdown()')。 – teppic
スレッドダンプを実行して、スレッドがブロックされている場所を確認します。 – teppic