マルチスレッドサーバーで問題が発生しています。私は学問的な課題として、具体的には接続を正常に閉じるためにビルドしています。ブロッキングキューで待機中のスレッドを正常に終了します
各接続はSessionクラスによって管理されます。このクラスは、接続用の2つのスレッド、つまり、DownstreamThreadとUpstreamThreadを保持します。
クライアントソケット上のUpstreamThreadブロックは、すべての受信文字列を処理して別のレイヤーに渡すメッセージにエンコードします。クライアントのメッセージが挿入されるBlockingQueue上のDownstreamThreadブロック。キューにメッセージがあると、ダウンストリームスレッドはメッセージをキューから取り出し、文字列に変換してクライアントに送信します。最終的なシステムでは、アプリケーション層が受信メッセージに対して作用し、適切なクライアントに送信するために送信メッセージをサーバーにプッシュしますが、今のところ、着信メッセージを1秒間スリープしてから、タイムスタンプが付加された発信メッセージとして送信されます。
問題は、クライアントが切断されたときにすべてを正常にシャットダウンすることです。私が主張している最初の問題は、クライアントがQUITコマンドで接続を終了していることをサーバーに知らせる通常の切断です。基本的な疑似コードは次のとおりです。
while (!quitting) {
inputString = socket.readLine() // blocks
if (inputString != "QUIT") {
// forward the message upstream
server.acceptMessage (inputString);
} else {
// Do cleanup
quitting = true;
socket.close();
}
}
上流のスレッドのメインループは入力文字列を調べます。 QUITの場合、スレッドは、クライアントが通信を終了し、ループを終了すると言うフラグを設定します。これにより、上流のスレッドがうまくシャットダウンします。
下流のスレッドのメインループは、接続終了フラグが設定されていない限り、BlockingQueue内のメッセージを待機します。そうであれば、下流のスレッドも終了するはずです。しかし、それは待っているだけで座っていません。その擬似コードは次のようになります。私はこれをテストしたとき
while (!quitting) {
outputMessage = messageQueue.take(); // blocks
sendMessageToClient (outputMessage);
}
、私は、クライアントが終了したときに、上流のスレッドがシャットダウンしますが、下流のスレッドがなかったことに気づきました。
ヘッドスクラッチが少し発生した後、私は、下流のスレッドが決して来ない着信メッセージを待っているBlockingQueueでブロックしていることに気付きました。アップストリームスレッドはQUITメッセージをチェーンの上に転送しません。
ダウンストリームスレッドを正常にシャットダウンする方法を教えてください。頭に浮かぶ最初のアイデアは、take()コールでタイムアウトを設定することでした。私はこのアイデアに熱心ではありません。なぜなら、あなたが選んだ価値はまったく満足できるものではないからです。長すぎるとゾンビのスレッドがシャットダウンする前に長時間そこに座っている、または短すぎると数分間アイドル状態になっていても有効な接続は強制終了されます。私はチェーンの上にQUITメッセージを送信することを考えましたが、それはそれがサーバーへの完全な往復を行い、次にアプリケーションに戻り、次にサーバーに戻って最後にセッションに戻る必要があります。これはどちらも優雅な解決策のようには見えません。
私はThread.stop()のドキュメントを見ましたが、とにかく正しく動作しなかったので、明らかに推奨されていません。実際にはオプションではないようです。私が持っていた別のアイデアは、何らかの理由で下流のスレッドで例外がトリガーされ、最終的にブロックされるようにすることでしたが、これは私を恐ろしいと思います。
どちらのスレッドも正常にシャットダウンできるはずですが、一方のスレッドが終了するともう一方のスレッドが単に他のスレッドのフラグを設定するよりも積極的な方法で終了する必要があるとも考えられますスレッドをチェックします。私はまだJavaに慣れていないので、私はむしろこの時点でアイディアから外れています。誰かが助言を持っているなら、それは非常に高く評価されるでしょう。
上記の擬似コードスニペットが問題の関連する部分をカバーしているとは思うが、完全性のために、以下のSessionクラスの実際のコードを含めた。フルクラスは約250ラインです。
import java.io.*;
import java.net.*;
import java.util.concurrent.*;
import java.util.logging.*;
/**
* Session class
*
* A session manages the individual connection between a client and the server.
* It accepts input from the client and sends output to the client over the
* provided socket.
*
*/
public class Session {
private Socket clientSocket = null;
private Server server = null;
private Integer sessionId = 0;
private DownstreamThread downstream = null;
private UpstreamThread upstream = null;
private boolean sessionEnding = false;
/**
* This thread handles waiting for messages from the server and sending
* them to the client
*/
private class DownstreamThread implements Runnable {
private BlockingQueue<DownstreamMessage> incomingMessages = null;
private OutputStreamWriter streamWriter = null;
private Session outer = null;
@Override
public void run() {
DownstreamMessage message;
Thread.currentThread().setName ("DownstreamThread_" + outer.getId());
try {
// Send connect message
this.sendMessageToClient ("Hello, you are client " + outer.getId());
while (!outer.sessionEnding) {
message = this.incomingMessages.take();
this.sendMessageToClient (message.getPayload());
}
// Send disconnect message
this.sendMessageToClient ("Goodbye, client " + getId());
} catch (InterruptedException | IOException ex) {
Logger.getLogger (DownstreamThread.class.getName()).log (Level.SEVERE, ex.getMessage(), ex);
} finally {
this.terminate();
}
}
/**
* Add a message to the downstream queue
*
* @param message
* @return
* @throws InterruptedException
*/
public DownstreamThread acceptMessage (DownstreamMessage message) throws InterruptedException {
if (!outer.sessionEnding) {
this.incomingMessages.put (message);
}
return this;
}
/**
* Send the given message to the client
*
* @param message
* @throws IOException
*/
private DownstreamThread sendMessageToClient (CharSequence message) throws IOException {
OutputStreamWriter osw;
// Output to client
if (null != (osw = this.getStreamWriter())) {
osw.write ((String) message);
osw.write ("\r\n");
osw.flush();
}
return this;
}
/**
* Perform session cleanup
*
* @return
*/
private DownstreamThread terminate() {
try {
this.streamWriter.close();
} catch (IOException ex) {
Logger.getLogger (DownstreamThread.class.getName()).log (Level.SEVERE, ex.getMessage(), ex);
}
this.streamWriter = null;
return this;
}
/**
* Get an output stream writer, initialize it if it's not active
*
* @return A configured OutputStreamWriter object
* @throws IOException
*/
private OutputStreamWriter getStreamWriter() throws IOException {
if ((null == this.streamWriter)
&& (!outer.sessionEnding)) {
BufferedOutputStream os = new BufferedOutputStream (outer.clientSocket.getOutputStream());
this.streamWriter = new OutputStreamWriter (os, "UTF8");
}
return this.streamWriter;
}
/**
*
* @param outer
*/
public DownstreamThread (Session outer) {
this.outer = outer;
this.incomingMessages = new LinkedBlockingQueue();
System.out.println ("Class " + this.getClass() + " created");
}
}
/**
* This thread handles waiting for client input and sending it upstream
*/
private class UpstreamThread implements Runnable {
private Session outer = null;
@Override
public void run() {
StringBuffer inputBuffer = new StringBuffer();
BufferedReader inReader;
Thread.currentThread().setName ("UpstreamThread_" + outer.getId());
try {
inReader = new BufferedReader (new InputStreamReader (outer.clientSocket.getInputStream(), "UTF8"));
while (!outer.sessionEnding) {
// Read whatever was in the input buffer
inputBuffer.delete (0, inputBuffer.length());
inputBuffer.append (inReader.readLine());
System.out.println ("Input message was: " + inputBuffer);
if (!inputBuffer.toString().equals ("QUIT")) {
// Forward the message up the chain to the Server
outer.server.acceptMessage (new UpstreamMessage (sessionId, inputBuffer.toString()));
} else {
// End the session
outer.sessionEnding = true;
}
}
} catch (IOException | InterruptedException e) {
Logger.getLogger (Session.class.getName()).log (Level.SEVERE, e.getMessage(), e);
} finally {
outer.terminate();
outer.server.deleteSession (outer.getId());
}
}
/**
* Class constructor
*
* The Core Java volume 1 book said that a constructor such as this
* should be implicitly created, but that doesn't seem to be the case!
*
* @param outer
*/
public UpstreamThread (Session outer) {
this.outer = outer;
System.out.println ("Class " + this.getClass() + " created");
}
}
/**
* Start the session threads
*/
public void run() //throws InterruptedException
{
Thread upThread = new Thread (this.upstream);
Thread downThread = new Thread (this.downstream);
upThread.start();
downThread.start();
}
/**
* Accept a message to send to the client
*
* @param message
* @return
* @throws InterruptedException
*/
public Session acceptMessage (DownstreamMessage message) throws InterruptedException {
this.downstream.acceptMessage (message);
return this;
}
/**
* Accept a message to send to the client
*
* @param message
* @return
* @throws InterruptedException
*/
public Session acceptMessage (String message) throws InterruptedException {
return this.acceptMessage (new DownstreamMessage (this.getId(), message));
}
/**
* Terminate the client connection
*/
private void terminate() {
try {
this.clientSocket.close();
} catch (IOException e) {
Logger.getLogger (Session.class.getName()).log (Level.SEVERE, e.getMessage(), e);
}
}
/**
* Get this Session's ID
*
* @return The ID of this session
*/
public Integer getId() {
return this.sessionId;
}
/**
* Session constructor
*
* @param owner The Server object that owns this session
* @param sessionId The unique ID this session will be given
* @throws IOException
*/
public Session (Server owner, Socket clientSocket, Integer sessionId) throws IOException {
this.server = owner;
this.clientSocket = clientSocket;
this.sessionId = sessionId;
this.upstream = new UpstreamThread (this);
this.downstream = new DownstreamThread (this);
System.out.println ("Class " + this.getClass() + " created");
System.out.println ("Session ID is " + this.sessionId);
}
}
私はそのことを考えていたが、正常なシャットダウンの一部として、その場しのぎのビットを例外をスローするために何かを強制されていませんか? – GordonM
いいえ、これは動作するように設計されています。それ以外の場合は、中断される可能性のあるすべてのメソッドが特別な中断された値を返す必要があります。詳細は、http://docs.oracle.com/javase/tutorial/essential/concurrency/interrupt.htmlを参照してください。 – Brigham
私はJavaについてあまりよく分かりませんが、 'Thread.interrupt'を呼び出すとどうなりますか?' take'メソッドをブロックしていませんか?私は中断は起こらないと思うし、私が推測する 'Thread.isInterrupted'をチェックする必要があります。 – Zuljin