2017-11-19 11 views
1

spark-notebookでApache SparkでNLPを実行しようとしています。この特定の例では、ライブラリhttps://opennlp.apache.orgを使用して名詞句を抽出するチャンクを作成しています。 データ量が増えたため、分散コンピューティングに移行する必要があります。Apache Sparkでの外部ライブラリオブジェクトのブロードキャスト

問題は、チャンクオブジェクトをブロードキャストできないことです。 (ボードのみのアレイのような単純なオブジェクトをキャスト)のドキュメントを読んでから、私は次のことを試してみました:

import opennlp.tools.tokenize.WhitespaceTokenizer 
import opennlp.tools.cmdline.postag.POSModelLoader 
import opennlp.tools.postag.POSTaggerME 
import opennlp.tools.chunker.ChunkerModel 
import opennlp.tools.chunker.ChunkerME 
import java.io.FileInputStream 
import java.io.File 

//Instantiate the ChunkerME class 
val inputStream = new FileInputStream("fr-chunk.bin"); 
val chunkerModel = new ChunkerModel(inputStream); 
val chunkerME = new ChunkerME(chunkerModel); 

val broadCastedChunkerME = sc.broadcast(chunkerME) 

しかし、これは、次のエラーがスローされます。

java.io.NotSerializableException: opennlp.tools.chunker.ChunkerME 
Serialization stack: 
    - object not serializable (class: opennlp.tools.chunker.ChunkerME, value: [email protected]) 
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:268) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:268) 
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1303) 
    at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:269) 
    at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:126) 
    at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88) 
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) 
    at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56) 
    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1411) 
    ... 63 elided 

私は初期化をラップする場合は仕事がありました何次のようなマップ方式で関数を呼び出し、関数内チャンカのと:

def getNPChunks(sentence: String): Array[Chunk] = { 
    import opennlp.tools.chunker.ChunkerModel 
    import opennlp.tools.chunker.ChunkerME 
    import java.io.FileInputStream 

    val inputStream = new FileInputStream("fr-chunk.bin"); 
    val chunkerModel = new ChunkerModel(inputStream); 

    //Instantiate the ChunkerME class 
    val chunkerME = new ChunkerME(chunkerModel); 

    chunkerME.chunkAsSpans(sentence); 
} 

// call the chunker 
line.map(getNPChunks) 

しかし、ここで問題は、それはiniファイルであるため、このコードは非常に非効率的であるということですrdd内のすべてのエントリについてチャンクオブジェクトを同調させる。 map関数はrddのすべてのエントリとすべてのエントリに対してgetNPChunks関数を呼び出しているので、新しいchunkerオブジェクトの作成が終了します。

この非効率的な設計のため、私のスパークスクリプトはシーケンシャルスクリプトより20倍遅く実行されています。

私は間違っていますか?

+0

"chunkerME"初期化をscalaクラスまたはscalaオブジェクトの中に記述しましたか? scalaクラス内で記述した場合は、scalaオブジェクト内に記述してからブロードキャストしてください。または、Serializableインターフェイスを実装して、Serializableクラスを作成しようとします。 –

+0

@AmitKumarは興味深いですが、現在はspark-notebookを使用していますので、チャンクは「オブジェクト」の外側で初期化する必要があります(ノートブックでは同じ概念は当てはまりません)。しかし、私は実際にこのコードを例題から見つけました。そこには、オブジェクト内にチャンクを初期化してブロードキャストした人がいました。多分それが働いた理由かもしれません。私は間違いなくspark-submitを使用するときにこれを試みます。あなたは答えとしてあなたの提案を投稿することができます。私がspark-submitをしようとすると、これが動作すればtrueに設定されます。 –

答えて

1

問題を解決する方法はmapPartitionsを使用することです。

def getChunker(): 
    val inputStream = new FileInputStream("fr-chunk.bin"); 
    val chunkerModel = new ChunkerModel(inputStream); 

    //Instantiate the ChunkerME class 
    val chunkerME = new ChunkerME(chunkerModel); 

line.mapPartitions(it => 
    val chunker = getChunker() 
    it.map(line => chunker.chunkAsSpans(line)) 
) 

mapPartitionsの詳細についてはこの回答を参照してください:https://stackoverflow.com/a/39203798/245024

+0

これはすでに素晴らしいソリューションです。この方法では、私はまだ仕事を並列化し、そう。しかし、地図と比較してmapPartitionsの方が性能が劣るか等しいかという疑問があります。または、1日の終わりのマップはmapPartitionsを使用していますか? –

+0

mapは内部的にmapPartitionsを使用しています。 sparkコードで確認できます:https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L372 – lev

-1

は、Scalaのオブジェクト内の「chunkerME」を初期化し、それを放送する1つのパーティションあたりチャンカの代わりに、行ごとに1つずつ作成することができます
その方法Scalaオブジェクトは、デフォルトではシリアル化されています。コンパイラは、スカラオブジェクトのシリアライゼーションを行います。

または、スカラクラス内で初期化する場合、シリアライズ可能な特性を拡張することによって、スカラークラスを明示的にシリアライズする必要があります。

関連する問題