cloud-bigtable-client(https://github.com/GoogleCloudPlatform/cloud-bigtable-client)を使用して、データフロー経由でBigtableに突然変異(インクリメント)を適用しようとしています。ここでHBaseMutationCoderで要素をエンコードできない
私の仕事が何をするかのハイレベルな概要です:このDoFnが原因で実行されたときに驚くべきことに、ジョブが失敗した
// c.element() is KV<String, Iterable<SomeData>>
public void processElement(ProcessContext c) {
Increment mutation = new Increment(c.element().getKey().getBytes());
for (SomeData data : c.element().getValue()) {
// Obtain cf (String), qual (String), value (long) from data.
// None of them is null.
mutation.addColumn(cf.getBytes(), qual.getBytes(), value);
}
c.output(mutation);
}
:
PCollection<SomeData> somedata = ...;
somedata.apply(ParDo.of(new CreateMutations()))
.setCoder(new HBaseMutationCoder()).apply(CloudBigtableIO.writeToTable(config));
// I don't think it is necessary to explicitly set Coder here; I tried both ways.
CreateMutations
はのように見えることDoFnです要素をHBaseMutationCoderでエンコードすることはできません。エラーメッセージの中に、それは明らかrow
、family
、column
修飾子、value
sは適切に満たされていることを示していること
(e8a8d266ed05e19f): java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: java.lang.RuntimeException: java.lang.IllegalArgumentException: Unable to encode element 'row=some_string, families={(family=a, columns={some_string/a:2:text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=1, some_string/a:8:text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=9620}), (family=m, columns={some_string/m:2:text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=1, some_string/m:8:text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=9620}}' with coder 'HBaseMutationCoder'.
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:160)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:288)
at ......
注意:ここでは、スタックトレースの小さな部分があります。この特定のエラーメッセージは、インクリメントする4つのセルが含まれていることを示しています。 私はDeleteとPutを使用するのに問題はありませんでしたが、Incrementを使うのは初めてのことです。行、ファミリ、修飾子、および値以外のデータが必要ですか?
本当にありがとうございます。
Increment
の代わりにPut
を使用しようとしましたが、それは機能しました((*とマークされた2行を除いて、上記と同じコードです)。
// c.element() is KV<String, Iterable<SomeData>>
public void processElement(ProcessContext c) {
Put mutation = new Put(c.element().getKey().getBytes()); //(*)
for (SomeData data : c.element().getValue()) {
// Obtain cf (String), qual (String), value (long) from data.
// None of them is null.
mutation.addImmutable(cf.getBytes(), qual.getBytes(), Bytes.toBytes(value)); //(*)
}
c.output(mutation);
}
(私はここでは関係の問題が見つかりました:How to load data into Google Cloud Bigtable from Google BigQuery を私が午前問題は、行/列のすべての家族/修飾子/値のが適切に満たされているようnull
値によって発生していないようです。)
更新:ここに完全なスタックトレースがあります。
(875583981e325b46): java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: java.lang.RuntimeException: java.lang.IllegalArgumentException: Unable to encode element 'row=some_string, families={(family=a, columns={some_string/a:8:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=940, some_string/a:2:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=2}), (family=m, columns={some_string/m:2:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=2, some_string/m:8:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=940}}' with coder 'HBaseMutationCoder'.
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:160)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:288)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:284)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext$1.outputWindowedValue(DoFnRunnerBase.java:508)
at com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsViaIteratorsDoFn.processElement(GroupAlsoByWindowsViaIteratorsDoFn.java:123)
at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:139)
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:188)
at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47)
at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:55)
at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:221)
at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:182)
at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:69)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:284)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:220)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:170)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:192)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:172)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:159)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.google.cloud.dataflow.sdk.util.UserCodeException: java.lang.RuntimeException: java.lang.IllegalArgumentException: Unable to encode element 'row=some_string, families={(family=a, columns={some_string/a:8:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=940, some_string/a:2:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=2}), (family=m, columns={some_string/m:2:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=2, some_string/m:8:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=940}}' with coder 'HBaseMutationCoder'.
at com.google.cloud.dataflow.sdk.util.UserCodeException.wrap(UserCodeException.java:35)
at com.google.cloud.dataflow.sdk.util.UserCodeException.wrapIf(UserCodeException.java:40)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.wrapUserCodeException(DoFnRunnerBase.java:369)
at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:51)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:139)
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:188)
at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47)
at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:55)
at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:158)
... 24 more
Caused by: java.lang.RuntimeException: java.lang.IllegalArgumentException: Unable to encode element 'row=some_string, families={(family=a, columns={some_string/a:8:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=940, some_string/a:2:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=2}), (family=m, columns={some_string/m:2:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=2, some_string/m:8:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=940}}' with coder 'HBaseMutationCoder'.
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:160)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:288)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.output(DoFnRunnerBase.java:450)
at com.moloco.dataflow.bigtable.AptRecovery$UpdateCountPerCell.processElement(AptRecovery.java:78)
Caused by: java.lang.IllegalArgumentException: Unable to encode element 'row=some_string, families={(family=a, columns={some_string/a:8:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=940, some_string/a:2:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=2}), (family=m, columns={some_string/m:2:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=2, some_string/m:8:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=940}}' with coder 'HBaseMutationCoder'.
at com.google.cloud.dataflow.sdk.coders.StandardCoder.getEncodedElementByteSize(StandardCoder.java:170)
at com.google.cloud.dataflow.sdk.coders.StandardCoder.registerByteSizeObserver(StandardCoder.java:185)
at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:641)
at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:552)
at com.google.cloud.dataflow.sdk.runners.worker.MapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(MapTaskExecutorFactory.java:351)
at com.google.cloud.dataflow.sdk.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:125)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:61)
at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:46)
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:158)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:288)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.output(DoFnRunnerBase.java:450)
at com.moloco.dataflow.bigtable.AptRecovery$UpdateCountPerCell.processElement(AptRecovery.java:78)
at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:139)
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:188)
at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47)
at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:55)
at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:158)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:288)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:284)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext$1.outputWindowedValue(DoFnRunnerBase.java:508)
at com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsViaIteratorsDoFn.processElement(GroupAlsoByWindowsViaIteratorsDoFn.java:123)
at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:139)
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:188)
at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47)
at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:55)
at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:221)
at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:182)
at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:69)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeW
スタックトレースに「原因:..」がありますか?あなたはそれを共有できますか?これにより、ここで起こっていることを絞り込むことができます。 –
私は完全なスタックトレースで質問を更新しましょう - 私は確信していません –