spark-notebookでApache SparkでNLPを実行しようとしています。この特定の例では、ライブラリhttps://opennlp.apache.orgを使用して名詞句を抽出するチャンクを作成しています。 データ量が増えたため、分散コンピューティングに移行する必要があります。Apache Sparkでの外部ライブラリオブジェクトのブロードキャスト
問題は、チャンクオブジェクトをブロードキャストできないことです。 (ボードのみのアレイのような単純なオブジェクトをキャスト)のドキュメントを読んでから、私は次のことを試してみました:
import opennlp.tools.tokenize.WhitespaceTokenizer
import opennlp.tools.cmdline.postag.POSModelLoader
import opennlp.tools.postag.POSTaggerME
import opennlp.tools.chunker.ChunkerModel
import opennlp.tools.chunker.ChunkerME
import java.io.FileInputStream
import java.io.File
//Instantiate the ChunkerME class
val inputStream = new FileInputStream("fr-chunk.bin");
val chunkerModel = new ChunkerModel(inputStream);
val chunkerME = new ChunkerME(chunkerModel);
val broadCastedChunkerME = sc.broadcast(chunkerME)
しかし、これは、次のエラーがスローされます。
java.io.NotSerializableException: opennlp.tools.chunker.ChunkerME
Serialization stack:
- object not serializable (class: opennlp.tools.chunker.ChunkerME, value: [email protected])
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:268)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:268)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1303)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:269)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:126)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1411)
... 63 elided
私は初期化をラップする場合は仕事がありました何次のようなマップ方式で関数を呼び出し、関数内チャンカのと:
def getNPChunks(sentence: String): Array[Chunk] = {
import opennlp.tools.chunker.ChunkerModel
import opennlp.tools.chunker.ChunkerME
import java.io.FileInputStream
val inputStream = new FileInputStream("fr-chunk.bin");
val chunkerModel = new ChunkerModel(inputStream);
//Instantiate the ChunkerME class
val chunkerME = new ChunkerME(chunkerModel);
chunkerME.chunkAsSpans(sentence);
}
// call the chunker
line.map(getNPChunks)
しかし、ここで問題は、それはiniファイルであるため、このコードは非常に非効率的であるということですrdd内のすべてのエントリについてチャンクオブジェクトを同調させる。 map関数はrddのすべてのエントリとすべてのエントリに対してgetNPChunks関数を呼び出しているので、新しいchunkerオブジェクトの作成が終了します。
この非効率的な設計のため、私のスパークスクリプトはシーケンシャルスクリプトより20倍遅く実行されています。
私は間違っていますか?
"chunkerME"初期化をscalaクラスまたはscalaオブジェクトの中に記述しましたか? scalaクラス内で記述した場合は、scalaオブジェクト内に記述してからブロードキャストしてください。または、Serializableインターフェイスを実装して、Serializableクラスを作成しようとします。 –
@AmitKumarは興味深いですが、現在はspark-notebookを使用していますので、チャンクは「オブジェクト」の外側で初期化する必要があります(ノートブックでは同じ概念は当てはまりません)。しかし、私は実際にこのコードを例題から見つけました。そこには、オブジェクト内にチャンクを初期化してブロードキャストした人がいました。多分それが働いた理由かもしれません。私は間違いなくspark-submitを使用するときにこれを試みます。あなたは答えとしてあなたの提案を投稿することができます。私がspark-submitをしようとすると、これが動作すればtrueに設定されます。 –