私はデータフローを使用していますが、ユーザーIDリストをコンマで区切って文字列を作成する必要があります。そして結果をGCSで書く。
残念ながら、DoFnのprocessElementでは、各行には多くのユーザーが存在し、java.lang.OutOfMemoryErrorという結果になります。
OutOfMemory例外を回避し、テキストファイルを含むGCSの各行に大量の行を正常に書き込む方法はありますか?
私のソースコードは以下の通りです。ソースコードここprocessElementの出力がfatである場合のメモリ不足例外の処理
PCollection<KV<String, String>> rows = someData
.apply(Combine.<String, String>perKey(new CombineUserIds()));
public static class CombineUserIds implements SerializableFunction<Iterable<String>, String> {
private static final long serialVersionUID = 0;
@Override
public String apply(Iterable<String> userIdList) {
return Joiner.on(",").join(userIdList);
}
}
、someData
は、そのキーGROUP_IDされた値であるUSER_ID PCollection<KV<String, String>>
タイプです。
そして次は、全体のエラーメッセージにある
オラクルdocsから (b997767fac436e5c): java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3332) at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137) at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421) at java.lang.StringBuilder.append(StringBuilder.java:136) at java.lang.StringBuilder.append(StringBuilder.java:76) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:457) at java.lang.StringBuilder.append(StringBuilder.java:166) at java.lang.StringBuilder.append(StringBuilder.java:76) at com.google.common.base.Joiner.appendTo(Joiner.java:111) at com.google.common.base.Joiner.appendTo(Joiner.java:152) at com.google.common.base.Joiner.join(Joiner.java:193) at com.google.common.base.Joiner.join(Joiner.java:183) at com.moloco.dataflow.ml.adhoc.GenerateMLUserProfileSet$CombineUserIds.apply(GenerateMLUserProfileSet.java:189) at com.moloco.dataflow.ml.adhoc.GenerateMLUserProfileSet$CombineUserIds.apply(GenerateMLUserProfileSet.java:184) at com.google.cloud.dataflow.sdk.transforms.Combine$IterableCombineFn.mergeToSingleton(Combine.java:1613) at com.google.cloud.dataflow.sdk.transforms.Combine$IterableCombineFn.mergeAccumulators(Combine.java:1591) at com.google.cloud.dataflow.sdk.transforms.Combine$IterableCombineFn.mergeAccumulators(Combine.java:1536) at com.google.cloud.dataflow.sdk.transforms.Combine$CombineFn$2.mergeAccumulators(Combine.java:489) at com.google.cloud.dataflow.sdk.runners.worker.GroupAlsoByWindowsParDoFnFactory$MergingKeyedCombineFn.extractOutput(GroupAlsoByWindowsParDoFnFactory.java:249) at com.google.cloud.dataflow.sdk.runners.worker.GroupAlsoByWindowsParDoFnFactory$MergingKeyedCombineFn.extractOutput(GroupAlsoByWindowsParDoFnFactory.java:216) at com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsAndCombineDoFn$KeyedCombineFnRunner.extractOutput(GroupAlsoByWindowsAndCombineDoFn.java:243) at com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsAndCombineDoFn.closeWindow(GroupAlsoByWindowsAndCombineDoFn.java:206) at com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsAndCombineDoFn.processElement(GroupAlsoByWindowsAndCombineDoFn.java:192) at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49) at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:138) at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:190) at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47) at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:53) at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:226)
あなたはローカルでパイプラインを実行している、またはGoogleのクラウドにしていますか? –
こんにちは@polleyg、私はGoogleの雲で実行されています。そして、私もn1-himem-32ワークタイプで試しました。しかし、メモリ不足の例外で失敗しました。 –