2016-05-30 8 views
1

SparkStreaming(Kafkaキューからメッセージを収集)を使用してTitanDBに要素を追加しようとしています。しかし、それは予想以上に難しいようです。ここで タイタン接続の定義:Spark(またはSparkStreaming)を使用してTitanDBにデータを挿入

val confPath: String = "titan-cassandra-es-spark.properties" 
val conn: TitanModule = new TitanModule(confPath) 

タイタンモジュールはTitanDB接続を設定Serializableクラスです:

... 
val configurationFilePath: String = confFilePath 
val configuration = new PropertiesConfiguration(configurationFilePath) 
val gConn: TitanGraph = TitanFactory.open(configuration) 
... 

私はカフカからのメッセージ(JSON)を収集sparkStreamingジョブを実行するときメッセージを受信して​​TitanDBに追加しようとすると、次のstackTraceで爆発します。

SparkStreamingでTitanDBにデータを追加することが可能かどうかは知っていますか? これの解決策は何か知っていますか?

18:03:50,596 ERROR JobScheduler:95 - Error running job streaming job 1464624230000 ms.0 
org.apache.spark.SparkException: Task not serializable 
     at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 
     at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 
     at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 
     at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) 
     at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:911) 
     at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
     at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
     at org.apache.spark.rdd.RDD.foreach(RDD.scala:910) 
     at salvob.SparkConsumer$$anonfun$main$1.apply(SparkConsumer.scala:200) 
     at salvob.SparkConsumer$$anonfun$main$1.apply(SparkConsumer.scala:132) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
     at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
     at scala.util.Try$.apply(Try.scala:161) 
     at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 
     at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) 
     at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) 
     at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) 
     at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
     at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 
Caused by: java.io.NotSerializableException: org.apache.commons.configuration.PropertiesConfiguration 
Serialization stack: 
     - object not serializable (class: org.apache.commons.configuration.PropertiesConfiguration, value: [email protected]8) 
     - field (class: salvob.TitanModule, name: configuration, type: class org.apache.commons.configuration.PropertiesConfiguration) 
     - object (class salvob.TitanModule, [email protected]) 
     - field (class: salvob.SparkConsumer$$anonfun$main$1$$anonfun$apply$3, name: conn$1, type: class salvob.TitanModule) 
     - object (class salvob.SparkConsumer$$anonfun$main$1$$anonfun$apply$3, <function1>) 
     at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
     at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) 
     at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) 
     at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) 
     ... 28 more 
Exception in thread "main" org.apache.spark.SparkException: Task not serializable 
     at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 
     at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 
     at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 
     at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) 
     at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:911) 
     at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
     at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
     at org.apache.spark.rdd.RDD.foreach(RDD.scala:910) 
     at salvob.SparkConsumer$$anonfun$main$1.apply(SparkConsumer.scala:200) 
     at salvob.SparkConsumer$$anonfun$main$1.apply(SparkConsumer.scala:132) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
     at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
     at scala.util.Try$.apply(Try.scala:161) 
     at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 
     at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) 
     at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) 
     at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) 
     at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
     at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 
Caused by: java.io.NotSerializableException: org.apache.commons.configuration.PropertiesConfiguration 
Serialization stack: 
     - object not serializable (class: org.apache.commons.configuration.PropertiesConfiguration, value: [email protected]8) 
     - field (class: salvob.TitanModule, name: configuration, type: class org.apache.commons.configuration.PropertiesConfiguration) 
     - object (class salvob.TitanModule, [email protected]) 
     - field (class: salvob.SparkConsumer$$anonfun$main$1$$anonfun$apply$3, name: conn$1, type: class salvob.TitanModule) 
     - object (class salvob.SparkConsumer$$anonfun$main$1$$anonfun$apply$3, <function1>) 
     at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
     at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) 
     at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) 
     at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) 
     ... 28 more 

答えて

1

スパークストリーミングはRDDを生成します。 RDD内のデータの処理はワーカー・ノード上で行われます。 rdd.map()内に記述するコードは、そのブロック内で参照され、処理のためにワーカー・ノードに送信されるオブジェクトとともに直列化されます。

だから、スパークによるグラフのインスタンスを使用するための理想的な方法は以下の通りです:

streamRdd.map(kafkaTuple => { 
    // create graph instance 
    // use graph instance to add/modify graph 
    // close graph instance 
}) 

しかし、これは行ごとに、新しいグラフのインスタンスを作成します。最適化として、インスタンスごとにグラフインスタンスを作成できます。

rdd.foreachPartition((rddRows: Iterator[kafkaTuple]) => { 
     val graph: TitanGraph = // create titan instance 
     val trans: TitanTransaction = graph.newTransaction() 

     rddRows.foreach(graphVertex => { 
     // do graph insertion in the above transaction 
     }) 

     createVertexTrans.commit() 
     graph.close() 
}) 

graph.newTransaction()はマルチスレッドグラフの更新に役立ちます。他の賢明なあなたはロックの例外を取得します。

私がこれまでに読んだことによると、マルチノード更新の直接サポートはありません。私が見たことから、Titan Transactionは頂点を修正しようとするたびにHBaseをロックで更新します。したがって、他のパーティションは更新を試みると失敗します。外部同期メカニズムを構築するか、rddを単一のパーティションに再分割して、上記のコードを使用して更新を行う必要があります。

+0

お返事ありがとうございます。私はHBaseを使用していませんが、これはTitan(私はCassandraを使用しています)で使用されているストレージが何であれ問題です。 – salvob

+0

ええ。ストレージの選択は重要ではありません。あなたは同時に複数のノードを介して挿入を行うことができましたか? –

0

他のスレーブマシンに渡すことができるすべてのクラスがシリアライズ可能であることを確認してください。それは非常に重要です。これらの渡されたクラス以外の変数は初期化しないでください。

Apache Spark(Not Streaming)を使用していて、うまくいきました。 TitanがSparkのバージョンを使用して以来、それを正しく得るのは簡単ではありませんでした。したがって、いくつかの依存関係の競合があります。これは動作する唯一のバージョンです

<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-core_2.10</artifactId> 
    <version>1.2.2</version> 
</dependency> 

これは私がクラスタを開始した方法です。

その後、私はそれが必要かどうGithubの上で、このモジュールを解放することができるかもしれない

JavaRDD<T> javaRDD = initRetriever(); // init JavaRDD 
javaRDD.foreachPartition(iter->{ 
    Graph graph= initGraph(); 
    Parser<T> parser= initParser(graph); 
    while(iter.hasNext()){ 
     try { 
      parser.parse(iter); // extends serializable ! 
     } catch (Exception e) { 
      logger.error("Failed in importing all vertices ", e); 
      graph.tx().rollback(); 
     } 
    } 
    graph.tx().commit(); 
}); 

データを解析します。

+0

ご返信ありがとうございます。 私はJavaではなくScalaを使用しますが、それは問題ではありません。あなたのタイタンバージョンは何ですか? あなたのコードでは、TitanDBの開始設定はどこにありますか? githubリポジトリが役立つかもしれません。 – salvob

+0

It's Titan 1.0 Titan DBの設定は、そのバージョンに同梱されている標準的な設定です。コードは完全に私のものではありません。私は提携先とそれについて話し合い、公開するときにこの回答を更新します。 –

+0

ありがとうございます。 Sparkを挿入したり、Traversalに使用しましたか? グラフをトラバースするよりも挿入が難しいことを認識しています – salvob

関連する問題