ETLプロセスの 'L'部分を処理するアプリケーションを作成しています。 Google Cloud Platformのストレージバケット(各ファイルには1つのCassandraテーブルに対応するデータが含まれ、各行は指定されたテーブルに挿入される1つのエントリを表します)からファイルを取得し、そのデータをCassandraに挿入します。BufferedReader SocketException:Cassandraデータを挿入中に接続がリセットされる
最大のファイル(現在問題を起こしている唯一のファイル)は約900KB(〜25k行、4列、多分50文字)です。次に大きなファイルは約300KBです。
問題は、900KBのファイルからレコードを挿入しようとすると、ジョブがほぼ半分、多分少し少なくなってからjava.net.SocketException: Connection reset
になるという問題です。このコードは、カップルの異なるクラス間で分割され
*The GCP related stuff*
Storage storage = StorageFactory.getService();
Storage.Objects.Get listRequest = storage.objects().list(bucketName);
List<StorageObject> results = new ArrayList<>();
Objects objects;
do {
objects = listRequest.execute();
results.addAll(objects.getItems());
listRequest.setPageToken(objects.getNextPageToken());
} while (objects.getNextPageToken() != null);
List<Storage.Objects.Get> files = new ArrayList<>();
for (StorageObject storageObject : storageObjects) {
files.add(storage.objects().get(bucketName, storageObject.getName());
}
*The processing stuff*
for (Storage.Objects.Get file : files) {
file.getMediaHttpDownloader().setDirectDownloadEnabled(true);
try (BufferedReader reader = new BufferedReader(new InputStreamReader(file.executeMediaAsInputStream(), "UTF-8"))) {
String line = reader.readLine();
while (line != null) {
String[] columnData = line.trim().split("\\|");
DomainObject domainObject = convertLineToObject(columnData);
domainObjectRepository.saveObject(domainObject);
line = reader.readLine();
}
} catch (Exception ex) {
log.error("log stuff" + ex.toString);
}
:
は、ここでは、コードです。余分な詳細なしで包括的なコードを提供しようとしています。 convertLineToObject
関数は単に文字列配列をとり、new DomainObject()
を作成し、columnData
配列の各インデックスを適切なフィールドに設定します。
私はDAOプロバイダを作成し、データベース操作を処理する社内ライブラリを使用しています。
domainObjectDAOProvider.getDAO(DomainObject.class).insert(domainObject);
insert()
呼び出しがBoundStatementを構築し、その文に呼び出す:domainObjectRepository.saveDomainObject()
コールは、そのライブラリを呼び出すコードの1行だけです。
は、ここで私が取得していますスタックトレースです:
java.net.SocketInputStream.read(SocketInputStream.java:209)
java.net.SocketInputStream.read(SocketInputStream.java:141)
sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
sun.security.ssl.InputRecord.readV3Record(InputRecord.java:593)
sun.security.ssl.InputRecord.read(InputRecord.java:532)
sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:973)
sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:930)
sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
java.io.BufferedInputStream.read(BufferedInputStream.java:345)
sun.net.www.MeteredStream.read(MeteredStream.java:134)
java.io.FilterInputStream.read(FilterInputStream.java:133)
sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(HttpURLConnection.java:3336)
com.google.api.client.http.javanet.NetHttpResponse$SizeValidatingInputStream.read(NetHttpResponse.java:169)
sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
java.io.InputStreamReader.read(InputStreamReader.java:184)
java.io.BufferedReader.fill(BufferedReader.java:161)
java.io.BufferedReader.readLine(BufferedReader.java:324)
java.io.BufferedReader.readLine(BufferedReader.java:389)
*The application call - referring to line = reader.readLine()*
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43), java.lang.reflect.Method.invoke(Method.java:497)
org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:65)
org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
私だけのデータベースをコメントアウトするときは、それがない時、フラットにファイルを飛行呼び出します。その呼び出しがアクティブになると、パフォーマンスが低下し、その例外が8000行から11000行の間のどこかにスローされます。ファイル内のデータをチェックしても問題ありません。不正な形式のデータはありません。まったく奇妙なことはありません。