2016-08-11 3 views
0

ETLプロセスの 'L'部分を処理するアプリケーションを作成しています。 Google Cloud Platformのストレージバケット(各ファイルには1つのCassandraテーブルに対応するデータが含まれ、各行は指定されたテーブルに挿入される1つのエントリを表します)からファイルを取得し、そのデータをCassandraに挿入します。BufferedReader SocketException:Cassandraデータを挿入中に接続がリセットされる

最大のファイル(現在問題を起こしている唯一のファイル)は約900KB(〜25k行、4列、多分50文字)です。次に大きなファイルは約300KBです。

問題は、900KBのファイルからレコードを挿入しようとすると、ジョブがほぼ半分、多分少し少なくなってからjava.net.SocketException: Connection resetになるという問題です。このコードは、カップルの異なるクラス間で分割され

*The GCP related stuff* 
Storage storage = StorageFactory.getService(); 
Storage.Objects.Get listRequest = storage.objects().list(bucketName); 
List<StorageObject> results = new ArrayList<>(); 
Objects objects; 

do { 
    objects = listRequest.execute(); 
    results.addAll(objects.getItems()); 
    listRequest.setPageToken(objects.getNextPageToken()); 
} while (objects.getNextPageToken() != null); 

List<Storage.Objects.Get> files = new ArrayList<>(); 
for (StorageObject storageObject : storageObjects) { 
    files.add(storage.objects().get(bucketName, storageObject.getName()); 
} 

*The processing stuff* 
for (Storage.Objects.Get file : files) { 
    file.getMediaHttpDownloader().setDirectDownloadEnabled(true); 
    try (BufferedReader reader = new BufferedReader(new InputStreamReader(file.executeMediaAsInputStream(), "UTF-8"))) { 
     String line = reader.readLine(); 
     while (line != null) { 
      String[] columnData = line.trim().split("\\|"); 
      DomainObject domainObject = convertLineToObject(columnData); 
      domainObjectRepository.saveObject(domainObject); 
      line = reader.readLine(); 
     } 
    } catch (Exception ex) { 
     log.error("log stuff" + ex.toString); 
    } 

は、ここでは、コードです。余分な詳細なしで包括的なコードを提供しようとしています。 convertLineToObject関数は単に文字列配列をとり、new DomainObject()を作成し、columnData配列の各インデックスを適切なフィールドに設定します。

私はDAOプロバイダを作成し、データベース操作を処理する社内ライブラリを使用しています。

domainObjectDAOProvider.getDAO(DomainObject.class).insert(domainObject); 

insert()呼び出しがBoundStatementを構築し、その文に​​呼び出す:domainObjectRepository.saveDomainObject()コールは、そのライブラリを呼び出すコードの1行だけです。

は、ここで私が取得していますスタックトレースです:

java.net.SocketInputStream.read(SocketInputStream.java:209) 
java.net.SocketInputStream.read(SocketInputStream.java:141) 
sun.security.ssl.InputRecord.readFully(InputRecord.java:465) 
sun.security.ssl.InputRecord.readV3Record(InputRecord.java:593) 
sun.security.ssl.InputRecord.read(InputRecord.java:532) 
sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:973) 
sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:930) 
sun.security.ssl.AppInputStream.read(AppInputStream.java:105) 
java.io.BufferedInputStream.read1(BufferedInputStream.java:284) 
java.io.BufferedInputStream.read(BufferedInputStream.java:345) 
sun.net.www.MeteredStream.read(MeteredStream.java:134) 
java.io.FilterInputStream.read(FilterInputStream.java:133) 
sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(HttpURLConnection.java:3336) 
com.google.api.client.http.javanet.NetHttpResponse$SizeValidatingInputStream.read(NetHttpResponse.java:169) 
sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284) 
sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326) 
sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) 
java.io.InputStreamReader.read(InputStreamReader.java:184) 
java.io.BufferedReader.fill(BufferedReader.java:161) 
java.io.BufferedReader.readLine(BufferedReader.java:324) 
java.io.BufferedReader.readLine(BufferedReader.java:389) 
*The application call - referring to line = reader.readLine()* 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43), java.lang.reflect.Method.invoke(Method.java:497) 
org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:65) 
org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
java.lang.Thread.run(Thread.java:745) 

私だけのデータベースをコメントアウトするときは、それがない時、フラットにファイルを飛行呼び出します。その呼び出しがアクティブになると、パフォーマンスが低下し、その例外が8000行から11000行の間のどこかにスローされます。ファイル内のデータをチェックしても問題ありません。不正な形式のデータはありません。まったく奇妙なことはありません。

答えて

0

データをもっと消費せずに接続を長時間開いておくと、Google Cloud Storageは接続をアイドル状態で終了します。データベースの呼び出しに時間がかかりすぎる場合は、入力データをローカルにキャッシュします。おそらく、SocketExceptionを後の呼び出しでMediaHttpDownloader.setBytesDownloadedを使用して取得したときに最後の読み取りオフセットで新しいInputStreamReaderを生成したラッパークラスを作成することもできますが、クライアントとサーバーの両方のリソースが浪費され、

関連する問題