2016-09-23 9 views
1

特定のポートでサーバーとして動作しているC++アプリケーションと対話する必要があります。パフォーマンスを向上させるために、バイナリAPI(プロトコルバッファ)を公開しています。私のRESTfulなサービスは、Spring MVCとJerseyで開発され、この新しい機能を使いたいと思っています。私は、プロトコルバッファメッセージを正常に消費して生成することができました。Java WebアプリケーションとC++サーバー間のソケット通信

私の春のWebアプリケーションでは、最初にApache Commons Poolを作成してソケット接続のプールを作成しました。これは私がソケットへの書き込み/読んでいた方法です

アップデート1:追加PooledObjectFactory実装

public class PooledSocketConnectionFactory extends BasePooledObjectFactory<Socket> { 

    private static final Logger LOGGER = LoggerFactory.getLogger(PooledSocketConnectionFactory.class); 

    final private String hostname; 
    final private int port; 

    private PooledSocketConnectionFactory(final String hostname, final int port) { 
     this.hostname = hostname; 
     this.port = port; 
    } 

    @Override 
    public Socket create() throws Exception { 
     return new Socket(hostname, port); 
    } 

    @Override 
    public PooledObject wrap(Socket socket) { 
     return new DefaultPooledObject<>(socket); 
    } 

    @Override 
    public void destroyObject(final PooledObject<Socket> p) throws Exception { 
     final Socket socket = p.getObject(); 
     socket.close(); 
    } 

    @Override 
    public boolean validateObject(final PooledObject<Socket> p) { 
     final Socket socket = p.getObject(); 
     return socket != null && socket.isConnected(); 
    } 

    @Override 
    public void activateObject(final PooledObject<SocketConnection> p) throws Exception { 
    } 

    @Override 
    public void passivateObject(final PooledObject<SocketConnection> p) throws Exception { 
    } 
} 

@Service 
@Scope("prototype") 
public class Gateway { 
    @Autowired 
    private GenericObjectPool pool; 

    public Response sendAndReceive(Request request) throws CommunicationException { 
     Response response = null; 
     final Socket socket = pool.borrowObject(); 
     try { 
      request.writeDelimitedTo(socket.getOutputStream()); 
      response = Response.parseDelimitedFrom(socket.getInputStream()); 
     } catch (Exception ex) { 
      LOGGER.error("Gateway error", ex); 
      throw new CommunicationException("Gateway error", ex); 
     } finally { 
      pool.returnObject(socket); 
     } 
     return response; 
    } 
} 

これは、最初の要求のために働くとプールが以前に使用されているソケットを返すとき、それがことが判明しましたソケットはすでに閉じています。異なる要求が同じ入出力ストリームに接続されている可能性があります。応答を読んだ後にソケットを閉じると、プーリングの目的に打ち勝ちます。シングルトンソケットを使用して注入すると、最初の要求を処理してからタイムアウトすることができます。

すべてのインスタンスでソケットを作成すると、その要求はすべて機能し、パフォーマンスは約2500マイクロ秒です。私の目標は、パフォーマンスを500マイクロ秒以内に得ることです。

要件に応じて最適な方法は何でしょうか?

アップデート2:サーバーとクライアント

package com.es.socket; 

import com.es.protos.RequestProtos.Request; 
import com.es.protos.ResponseProtos.Response; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import java.io.*; 
import java.net.ServerSocket; 
import java.net.Socket; 

public class TcpServer1 { 

    final static Logger LOGGER = LoggerFactory.getLogger(TcpServer1.class.getName()); 

    public static void main(String[] args) throws Exception { 
     ServerSocket serverSocket = new ServerSocket(Integer.parseInt(args[0])); 
     Socket socket = null; 
     while (true) { 
      try { 
       socket = serverSocket.accept(); 
      } catch (IOException e) { 
       LOGGER.warn("Could not listen on port"); 
       System.exit(-1); 
      } 

      Thread thread = new Thread(new ServerConnection1(socket)); 
      thread.start(); 
     } 
    } 
} 

class ServerConnection1 implements Runnable { 

    static final Logger LOGGER = LoggerFactory.getLogger(ServerConnection.class.getName()); 

    private Socket socket = null; 

    ServerConnection1(Socket socket) { 
     this.socket = socket; 
    } 

    public void run() { 
     try { 
      serveRequest(socket.getInputStream(), socket.getOutputStream()); 
      //socket.close(); 
     } catch (IOException ex) { 
      LOGGER.warn("Error", ex); 
     } 
    } 

    public void serveRequest(InputStream inputStream, OutputStream outputStream) { 
     try { 
      read(inputStream); 
      write(outputStream); 
     } catch (IOException ex) { 
      LOGGER.warn("ERROR", ex); 
     } 
    } 

    private void write(OutputStream outputStream) throws IOException { 
     Response.Builder builder = Response.newBuilder(); 
     Response response = builder.setStatus("SUCCESS").setPing("PING").build(); 
     response.writeDelimitedTo(outputStream); 
     LOGGER.info("Server sent {}", response.toString()); 
    } 

    private void read(InputStream inputStream) throws IOException { 
     Request request = Request.parseDelimitedFrom(inputStream); 
     LOGGER.info("Server received {}", request.toString()); 
    } 
} 

package com.es.socket; 

import com.es.protos.RequestProtos.Request; 
import com.es.protos.ResponseProtos.Response; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import java.io.*; 
import java.net.Socket; 

public class TcpClient1 { 

    final static Logger LOGGER = LoggerFactory.getLogger(TcpClient1.class.getName()); 

    private Socket openConnection(final String hostName, final int port) { 
     Socket clientSocket = null; 
     try { 
      clientSocket = new Socket(hostName, port); 
     } catch (IOException e) { 
      LOGGER.warn("Exception occurred while connecting to server", e); 
     } 
     return clientSocket; 
    } 

    private void closeConnection(Socket clientSocket) { 
     try { 
      LOGGER.info("Closing the connection"); 
      clientSocket.close(); 
     } catch (IOException e) { 
      LOGGER.warn("Exception occurred while closing the connection", e); 
     } 
    } 

    private void write(OutputStream outputStream) throws IOException { 
     Request.Builder builder = Request.newBuilder(); 
     Request request = builder.setPing("PING").build(); 
     request.writeDelimitedTo(outputStream); 
     LOGGER.info("Client sent {}", request.toString()); 
    } 

    private void read(InputStream inputStream) throws IOException { 
     Response response = Response.parseDelimitedFrom(inputStream); 
     LOGGER.info("Client received {}", response.toString()); 
    } 

    public static void main(String args[]) throws Exception { 
     TcpClient1 client = new TcpClient1(); 
     try { 
      Socket clientSocket = null; 

      LOGGER.info("Scenario 1 --> One socket for each call"); 
      for (int i = 0; i < 2; i++) { 
       clientSocket = client.openConnection("localhost", Integer.parseInt(args[0])); 
       OutputStream outputStream = clientSocket.getOutputStream(); 
       InputStream inputStream = clientSocket.getInputStream(); 
       LOGGER.info("REQUEST {}", i); 
       client.write(outputStream); 
       client.read(inputStream); 
       client.closeConnection(clientSocket); 
      } 

      LOGGER.info("Scenario 2 --> One socket for all calls"); 
      clientSocket = client.openConnection("localhost", Integer.parseInt(args[0])); 
      OutputStream outputStream = clientSocket.getOutputStream(); 
      InputStream inputStream = clientSocket.getInputStream(); 
      for (int i = 0; i < 2; i++) { 
       LOGGER.info("REQUEST {}", i); 
       client.write(outputStream); 
       client.read(inputStream); 
      } 
      client.closeConnection(clientSocket); 
     } catch (Exception e) { 
      LOGGER.warn("Exception occurred", e); 
      System.exit(1); 
     } 
    } 
} 

はここで、リクエストとレスポンスを追加するには、プロトコルバッファクラスです。シナリオ1では、両方のコールを処理することができますが、シナリオ2では、2回目のコールから返されることはありません。 Protocol Buffer APIがストリームを異なる方法で処理しているようです。以下のサンプル出力

17:03:10.508 [main] INFO c.d.e.socket.TcpClient1 - Scenario 1 --> One socket for each call 
17:03:10.537 [main] INFO c.d.e.socket.TcpClient1 - REQUEST 0 
17:03:10.698 [main] INFO c.d.e.socket.TcpClient1 - Client sent ping: "PING" 
17:03:10.730 [main] INFO c.d.e.socket.TcpClient1 - Client received status: "SUCCESS" 
ping: "PING" 
17:03:10.730 [main] INFO c.d.e.socket.TcpClient1 - Closing the connection 
17:03:10.731 [main] INFO c.d.e.socket.TcpClient1 - REQUEST 1 
17:03:10.732 [main] INFO c.d.e.socket.TcpClient1 - Client sent ping: "PING" 
17:03:10.733 [main] INFO c.d.e.socket.TcpClient1 - Client received status: "SUCCESS" 
ping: "PING" 
17:03:10.733 [main] INFO c.d.e.socket.TcpClient1 - Closing the connection 
17:03:10.733 [main] INFO c.d.e.socket.TcpClient1 - Scenario 2 --> One socket for all calls 
17:03:10.733 [main] INFO c.d.e.socket.TcpClient1 - REQUEST 0 
17:03:10.734 [main] INFO c.d.e.socket.TcpClient1 - Client sent ping: "PING" 
17:03:10.734 [main] INFO c.d.e.socket.TcpClient1 - Client received status: "SUCCESS" 
ping: "PING" 
17:03:10.734 [main] INFO c.d.e.socket.TcpClient1 - REQUEST 1 
17:03:10.735 [main] INFO c.d.e.socket.TcpClient1 - Client sent ping: "PING" 
+0

ない(GenericObjectPool' 'で使用される)' PooledObjectFactory'がどのように動作するかについて十分な情報 - 多分 'passivateObject'方法は、ソケットを閉じ? –

+0

アイドル時間の特定の時間枠の後など、接続を閉じるC++アプリケーションではないでしょうか? – Gimby

+0

@Adrian工場クラスを追加 – user2459396

答えて

0

大きな苦痛の後、私はこの問題を解決できました。ソケットへの読み書きを扱っていたクラスをプロトタイプとして定義しました。したがって、ソケットへの参照が取得されると、それはクリアされませんでした(Tomcatによって管理されました)。そのような後続のソケットへの呼び出しはキューに入れられ、タイムアウトしてオブジェクトはApache Commons Poolによって破棄されます。

これを修正するために、SocketのThreadLocalでクラスSocketConnectionを作成しました。処理側では、ソケットへの読み書きを処理するコールバックを作成しました。以下のサンプルコードスニペット:

class SocketConnection { 

    final private String identity; 
    private boolean alive; 
    final private ThreadLocal<Socket> threadLocal; 

    public SocketConnection(final String hostname, final int port) throws IOException { 
     this.identity = UUID.randomUUID().toString(); 
     this.alive = true; 
     threadLocal = ThreadLocal.withInitial(rethrowSupplier(() -> new Socket(hostname, port))); 
    } 

} 

public class PooledSocketConnectionFactory extends BasePooledObjectFactory<SocketConnection> { 

    private static final Logger LOGGER = LoggerFactory.getLogger(PooledSocketConnectionFactory.class); 

    final private String hostname; 
    final private int port; 
    private SocketConnection connection = null; 

    private PooledSocketConnectionFactory(final String hostname, final int port) { 
     this.hostname = hostname; 
     this.port = port; 
    } 

    @Override 
    public SocketConnection create() throws Exception { 
     LOGGER.info("Creating Socket"); 
     return new SocketConnection(hostname, port); 
    } 

    @Override 
    public PooledObject wrap(SocketConnection socketConnection) { 
     return new DefaultPooledObject<>(socketConnection); 
    } 

    @Override 
    public void destroyObject(final PooledObject<SocketConnection> p) throws Exception { 
     final SocketConnection socketConnection = p.getObject(); 
     socketConnection.setAlive(false); 
     socketConnection.close(); 
    } 

    @Override 
    public boolean validateObject(final PooledObject<SocketConnection> p) { 
     final SocketConnection connection = p.getObject(); 
     final Socket socket = connection.get(); 
     return connection != null && connection.isAlive() && socket.isConnected(); 
    } 

    @Override 
    public void activateObject(final PooledObject<SocketConnection> p) throws Exception { 
     final SocketConnection socketConnection = p.getObject(); 
     socketConnection.setAlive(true); 
    } 

    @Override 
    public void passivateObject(final PooledObject<SocketConnection> p) throws Exception { 
     final SocketConnection socketConnection = p.getObject(); 
     socketConnection.setAlive(false); 
    } 

} 

class SocketCallback implements Callable<Response> { 

    private SocketConnection socketConnection; 
    private Request request; 

    public SocketCallback() { 
    } 

    public SocketCallback(SocketConnection socketConnection, Request request) { 
     this.socketConnection = socketConnection; 
     this.request = request; 
    } 

    public Response call() throws Exception { 
     final Socket socket = socketConnection.get(); 
     request.writeDelimitedTo(socket.getOutputStream()); 
     Response response = Response.parseDelimitedFrom(socket.getInputStream()); 
     return response; 
    } 

} 

@Service 
@Scope("prototype") 
public class SocketGateway { 

    private static final Logger LOGGER = LoggerFactory.getLogger(SocketGateway.class); 

    @Autowired 
    private GenericObjectPool<SocketConnection> socketPool; 
    @Autowired 
    private ExecutorService executorService; 

    public Response eligibility(Request request) throws DataException { 
     EligibilityResponse response = null; 
     SocketConnection connection = null; 
     if (request != null) { 
      try { 
       connection = socketPool.borrowObject(); 
       Future<Response> future = executorService.submit(new SocketCallback(connection, request)); 
       response = future.get(); 
      } catch (Exception ex) { 
       LOGGER.error("Gateway error {}"); 
       throw new DataException("Gateway error", ex); 
      } finally { 
       socketPool.returnObject(connection); 
      } 
     } 

     return response; 
    } 

} 
関連する問題