1

データフローランナーを使用してダイレクトランナーとローカルで両方ともパイプラインを実行するapache-beamアプリケーションがあります。ローカルでは動作しますが、googleのデータフローランナーでは失敗します。ここApache Beamで実行時に作成されたクラスをシリアライズする方法

エラートレースは、次のとおり

(9938ce94c0752c7):java.lang.RuntimeException:com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalArgumentExceptionが:com.google.cloud.dataflow.worker.MapTaskExecutorFactory $ 3.typedApply(MapTaskExecutorFactory.java:283) で直列化されたDoFnInfo をデシリアライズすることができないでcom.google.cloud.dataflow.worker.MapTaskExecutorFactory $ 3.typedApply (MapTaskExecutorFactory.java:253) (com.google.cloud.dataflow.worker.graph.Networks)$ TypeSafeNodeFunction.apply(Networks.java:55) at com.google.cloud.dataflow.worker.graph.Networks $ TypeSafeNodeFunction.apply(Networks.java:43) (com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:78) at) com.google.cloud.dataflow.worker.MapTaskExecutorFactory.create(MapTaskExecutorFactory.java:142) (com.google.cloud.dataflow.worker.DataflowWorker.doWork(DataflowWorker.java:271) 、com.google.cloud) dataflow.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:244) (com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness)$ WorkerThread.doWork(DataflowBatchWorkerHarness.java:135) (com.google.cloud.dataflow.worker) DataflowBatchWorkerHarness $ WorkerThread.call(DataflowBatchWorkerHarness.java:115) at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness $ W orkerThread.call(DataflowBatchWorkerHarness.java:102) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java。 (Thread.java:745)
原因:com.google.cloud.dataflow.worker.repackaged.com .google.common.util.concurrent.UncheckedExecutionException:java.lang.IllegalArgumentException:直列化されたDoFnInfoを直列化解除できない場合 com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache $ Segment.get( LocalCache.java:2214) (com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache.get(LocalCache.java:4053)) (com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache $ LocalManualCache.get(LocalCache.java:4899) at com.google.cloud.dataflow.worker.UserParDoFnFactory.create UserParDoFnFactory.java:95) at com.google.cloud.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:66) at com.google.cloud.dataflow.worker.MapTaskExecutorFactory.createParDoOperation(MapTaskExecutorFactory.java:360) com.google.cloud.dataflow.worker.MapTaskExecutorFactory $ 3.typedApply(MapTaskExecutorFactory.java:271) で ... 14もっと
によって引き起こさ:java.lang.IllegalArgumentExceptionが:org.apacheで直列化されたDoFnInfo をデシリアライズすることができません.beam.sdk.util.SerializableUtils.deserializeFromByteArray(Seriali zooUtils.java:75) at com.google.cloud.dataflow.worker.UserParDoFnFactory $ UserDoFnExtractor.getDoFnInfo(UserParDoFnFactory.java:64) at com.google.cloud.dataflow.worker.UserParDoFnFactory $ 1.call(UserParDoFnFactory.java :100) (com.google.cloud.dataflow.worker.UserParDoFnFactory $ 1.call(UserParDoFnFactory.java:97) 、com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache $ LocalManualCache $ 1.load(LocalCache.java:4904) (com.google.cloud.dataflow.worker.repackaged.com.google.common.cache)LocalCache $ LoadingValueReference.loadFuture(LocalCache.java:3628)com35.png。 google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache $ Segment.lockedGetOrLoad(LocalCache.java:2295) (com.google.cloud.dataflow.worker.repackaged.com.google.common)。 cache.LocalCache $ Segment.get(LocalCache.java:2208)によって引き起こさ ... 20もっと
:java.lang.ClassNotFoundExceptionが:java.net.URLClassLoader.findClassでHeader_H (URLClassLoader.java:381) でjava.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher $ AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) (java.lang.Class.forName0)(ネイティブメソッド) at java.lang.Class.forName(Class.java:348) at java.io .ObjectInputStream.resolveClass(ObjectInputStream.java:628) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) at java.io.ObjectInputStream .readClass(ObjectInputStream.java:1486) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1336) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData (ObjectInputStream.java:1942) at java.io.Object InputStream.readOrdinaryObject(ObjectInputStream.java:1808) のjava.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) のjava.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) のjava.io.ObjectInputStreamです。 readSerialData(ObjectInputStream.java:1942) のjava.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) のjava.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) のjava.io.ObjectInputStream.readObject( org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArrayでObjectInputStream.java:373) (SerializableUtils.java:72) ... 28もっと

それは

"...直列化DoFnInfoをデシリアライズすることができない"

を指して、 "...にjava.lang.ClassNotFoundException:Header_H"

これは私がbytebuddyコードを使ってクラスHeader_Hを作るのと関係があると思われます。私はbytebuddyを使用して、既存のソースコードにsome.classに基づいてサブクラスを構築し、実行時に構成ファイルから追加のユーザー入力を受け取りました。つまり、実行時にHeader_Hしか利用できなくなりました。

私bytebuddyコードは多少このようなものです:

builder = new ByteBuddy().subclass(some.class).name("Header_H").modifiers(PUBLIC); 
     .defineField("serialVersionUID", long.class, STATIC, PRIVATE, FINAL).value(37L) 
     .implement(Serializable.class); 

Class <?> clazz = builder.make().load(getClass().getClassLoader()).getLoaded(); 

そしてclazz(この場合はHeader_Hに)データフロー内のパイプラインに渡されます。一時的なGoogleクラウドステージの場所にあるjarファイルの内容を確認すると、some.classが表示されますが、Header_H.classではなく、おそらく "ClassNotFoundException"というエラーが発生します。

私の推論が正しければ、クラス作成時にimplement(Serializable.class)があるとすれば、データフローランナーに送信するjarファイルにランタイム作成クラスを配置するにはどうすればよいですか?

答えて

1

バイトバディを介してjarファイルにクラスを注入することができます。このようにして、すでにシステムクラスパス上にある既存のjarを変更することができます。

このAPIでは、新しいjarを作成することもできます。また、このクラスを新しいjarファイルとしてクラスパスに追加できるようにするには、Instrumentation API(Javaエージェント経由)を使用できます。エージェントの接続を回避するには、バイトバディエージェントプロジェクトを動的添付ファイルとして使用することもできます。

これはで動作します:動的な添付ファイルがGoogleクラウド上で許可されていない場合は、コマンドライン上で定期的に結合することによってこの問題を解決することができるかもしれない

File someFolder = ... 
File jar = builder.saveIn(someFolder); 
ByteBuddyAgent.install().appendToSystemClassLoaderSearch(new JarFile(jar)); 

+0

上記のコードでは、 'type.inject(somejar)'と 'type.saveIn(somefolder)'を意味しますか? – bignano

1

データフローランナーはあなたのJARファイルの内容をコントロールしていない - それは、あなたのプログラムのクラスパスを解析し、ディスクおよびGCS上のパイプラインのステージングディレクトリにコピーしてからJARを読み取ります。現在、Beamはクラスパス上のJARに含まれていないクラスを出荷する手段を提供していません。

あなたのパイプラインの仕様では、それらのJARからのクラスだけを使用する方法を見つける必要があるでしょうが、あなたのDoFnやその他のコードでByteBuddyを使用することはできます。しかし、作業者間で出荷されるもの(例えば、PCollectionのコンテンツ)は、シリアライズ可能(シリアライザでシリアライズ可能であり、別のシリアライザでデシリアライズ可能)である必要があります。

また、ByteBuddyにJARを生成させ、プログラムのクラスパスに動的に追加する方法もあります。それはうまくいくかもしれませんが、それはByteBuddy特有の質問です。私はByteBuddyにそれをどうやって行うのかについて十分に精通していません。これは、動的に生成されたクラスを含むように既存のJARファイルを変化させる

DynamicType.Unloaded<?> type = builder.make(); 
builder.inject(someJar); 

+0

ありがとう@jkff。誰かがBytebuddy側を明確にするのに役立つかどうかを見てみましょう。 – bignano

関連する問題