2013-05-01 28 views





while (!quitting) { 
    inputString = socket.readLine() // blocks 
    if (inputString != "QUIT") { 
     // forward the message upstream 
     server.acceptMessage (inputString); 
    } else { 
     // Do cleanup 
     quitting = true; 

上流のスレッドのメインループは入力文字列を調べます。 QUITの場合、スレッドは、クライアントが通信を終了し、ループを終了すると言うフラグを設定します。これにより、上流のスレッドがうまくシャットダウンします。


while (!quitting) { 
    outputMessage = messageQueue.take(); // blocks 
    sendMessageToClient (outputMessage); 







import java.io.*; 
import java.net.*; 
import java.util.concurrent.*; 
import java.util.logging.*; 

* Session class 
* A session manages the individual connection between a client and the server. 
* It accepts input from the client and sends output to the client over the 
* provided socket. 
public class Session { 
    private Socket    clientSocket = null; 
    private Server    server   = null; 
    private Integer    sessionId  = 0; 
    private DownstreamThread downstream  = null; 
    private UpstreamThread  upstream  = null; 
    private boolean    sessionEnding = false; 

    * This thread handles waiting for messages from the server and sending 
    * them to the client 
    private class DownstreamThread implements Runnable { 
     private BlockingQueue<DownstreamMessage> incomingMessages = null; 
     private OutputStreamWriter     streamWriter  = null; 
     private Session        outer    = null; 

     public void run() { 
      DownstreamMessage message; 
      Thread.currentThread().setName ("DownstreamThread_" + outer.getId()); 

      try { 
       // Send connect message 
       this.sendMessageToClient ("Hello, you are client " + outer.getId()); 

       while (!outer.sessionEnding) { 
        message = this.incomingMessages.take(); 
        this.sendMessageToClient (message.getPayload()); 

       // Send disconnect message 
       this.sendMessageToClient ("Goodbye, client " + getId()); 

      } catch (InterruptedException | IOException ex) { 
       Logger.getLogger (DownstreamThread.class.getName()).log (Level.SEVERE, ex.getMessage(), ex); 
      } finally { 

     * Add a message to the downstream queue 
     * @param message 
     * @return 
     * @throws InterruptedException 
     public DownstreamThread acceptMessage (DownstreamMessage message) throws InterruptedException { 
      if (!outer.sessionEnding) { 
       this.incomingMessages.put (message); 

      return this; 

     * Send the given message to the client 
     * @param message 
     * @throws IOException 
     private DownstreamThread sendMessageToClient (CharSequence message) throws IOException { 
      OutputStreamWriter osw; 
      // Output to client 
      if (null != (osw = this.getStreamWriter())) { 
       osw.write ((String) message); 
       osw.write ("\r\n"); 

      return this; 

     * Perform session cleanup 
     * @return 
     private DownstreamThread terminate() { 
      try { 
      } catch (IOException ex) { 
       Logger.getLogger (DownstreamThread.class.getName()).log (Level.SEVERE, ex.getMessage(), ex); 
      this.streamWriter = null; 

      return this; 

     * Get an output stream writer, initialize it if it's not active 
     * @return A configured OutputStreamWriter object 
     * @throws IOException 
     private OutputStreamWriter getStreamWriter() throws IOException { 
      if ((null == this.streamWriter) 
      && (!outer.sessionEnding)) { 
       BufferedOutputStream os = new BufferedOutputStream (outer.clientSocket.getOutputStream()); 
       this.streamWriter  = new OutputStreamWriter (os, "UTF8"); 

      return this.streamWriter; 

     * @param outer 
     public DownstreamThread (Session outer) { 
      this.outer    = outer; 
      this.incomingMessages = new LinkedBlockingQueue(); 
      System.out.println ("Class " + this.getClass() + " created"); 

    * This thread handles waiting for client input and sending it upstream 
    private class UpstreamThread implements Runnable { 
     private Session outer = null; 

     public void run() { 
      StringBuffer inputBuffer = new StringBuffer(); 
      BufferedReader inReader; 

      Thread.currentThread().setName ("UpstreamThread_" + outer.getId()); 

      try { 
       inReader = new BufferedReader (new InputStreamReader (outer.clientSocket.getInputStream(), "UTF8")); 

       while (!outer.sessionEnding) { 
        // Read whatever was in the input buffer 
        inputBuffer.delete (0, inputBuffer.length()); 
        inputBuffer.append (inReader.readLine()); 
        System.out.println ("Input message was: " + inputBuffer); 

        if (!inputBuffer.toString().equals ("QUIT")) { 
         // Forward the message up the chain to the Server 
         outer.server.acceptMessage (new UpstreamMessage (sessionId, inputBuffer.toString())); 
        } else { 
         // End the session 
         outer.sessionEnding = true; 

      } catch (IOException | InterruptedException e) { 
       Logger.getLogger (Session.class.getName()).log (Level.SEVERE, e.getMessage(), e); 
      } finally { 
       outer.server.deleteSession (outer.getId()); 

     * Class constructor 
     * The Core Java volume 1 book said that a constructor such as this 
     * should be implicitly created, but that doesn't seem to be the case! 
     * @param outer 
     public UpstreamThread (Session outer) { 
      this.outer = outer; 
      System.out.println ("Class " + this.getClass() + " created"); 

    * Start the session threads 
    public void run() //throws InterruptedException 
     Thread upThread  = new Thread (this.upstream); 
     Thread downThread = new Thread (this.downstream); 


    * Accept a message to send to the client 
    * @param message 
    * @return 
    * @throws InterruptedException 
    public Session acceptMessage (DownstreamMessage message) throws InterruptedException { 
     this.downstream.acceptMessage (message); 
     return this; 

    * Accept a message to send to the client 
    * @param message 
    * @return 
    * @throws InterruptedException 
    public Session acceptMessage (String message) throws InterruptedException { 
     return this.acceptMessage (new DownstreamMessage (this.getId(), message)); 

    * Terminate the client connection 
    private void terminate() { 
     try { 
     } catch (IOException e) { 
      Logger.getLogger (Session.class.getName()).log (Level.SEVERE, e.getMessage(), e); 

    * Get this Session's ID 
    * @return The ID of this session 
    public Integer getId() { 
     return this.sessionId; 

    * Session constructor 
    * @param owner The Server object that owns this session 
    * @param sessionId The unique ID this session will be given 
    * @throws IOException 
    public Session (Server owner, Socket clientSocket, Integer sessionId) throws IOException { 

     this.server   = owner; 
     this.clientSocket = clientSocket; 
     this.sessionId  = sessionId; 
     this.upstream  = new UpstreamThread (this); 
     this.downstream  = new DownstreamThread (this); 

     System.out.println ("Class " + this.getClass() + " created"); 
     System.out.println ("Session ID is " + this.sessionId); 





私はそのことを考えていたが、正常なシャットダウンの一部として、その場しのぎのビットを例外をスローするために何かを強制されていませんか? – GordonM


いいえ、これは動作するように設計されています。それ以外の場合は、中断される可能性のあるすべてのメソッドが特別な中断された値を返す必要があります。詳細は、http://docs.oracle.com/javase/tutorial/essential/concurrency/interrupt.htmlを参照してください。 – Brigham


私はJavaについてあまりよく分かりませんが、 'Thread.interrupt'を呼び出すとどうなりますか?' take'メソッドをブロックしていませんか?私は中断は起こらないと思うし、私が推測する 'Thread.isInterrupted'をチェックする必要があります。 – Zuljin




while (true) { 
    outputMessage = messageQueue.take(); // blocks 
    if (QUIT == outputMessage) 
    sendMessageToClient (outputMessage); 