2017-11-02 7 views
1

GFのRDF(N-Triplesではテキストファイルなので)を読み込み、何らかの形で変換するデータフローパイプライン(SDK 2.1.0、Apache Beam 2.2.0)があります。それをGCSに書き戻しますが、別のバケットに書き出します。このパイプラインでは、3つの単一ファイル(1つの入力につき1つのファイル)であるサイド入力を使用し、それらをParDoで使用します。SideInputsがDataFlowのパイプラインのデータを壊します

JavaでRDFを使用するにはApache Jenaを使用します。そのため、各ファイルはModelクラスのインスタンスに読み込まれます。 DataFlowにはCoderがありませんので、私自身で開発しました(RDFModelCoder、下記参照)。それは私が作成した他のパイプラインの数でうまく動作します。

問題この特定のパイプラインを使用すると、サイド入力を追加すると、実行が失敗し、データの破損を示す例外が発生します。つまり、ガベージが追加されます。サイド入力を削除すると、パイプラインの実行は正常に終了します。

例外(それはRDFModelCoderからスローされます、以下を参照してください):

Caused by: org.apache.jena.atlas.RuntimeIOException: java.nio.charset.MalformedInputException: Input length = 1 
    at org.apache.jena.atlas.io.IO.exception(IO.java:233) 
    at org.apache.jena.atlas.io.CharStreamBuffered$SourceReader.fill(CharStreamBuffered.java:77) 
    at org.apache.jena.atlas.io.CharStreamBuffered.fillArray(CharStreamBuffered.java:154) 
    at org.apache.jena.atlas.io.CharStreamBuffered.advance(CharStreamBuffered.java:137) 
    at org.apache.jena.atlas.io.PeekReader.advanceAndSet(PeekReader.java:235) 
    at org.apache.jena.atlas.io.PeekReader.init(PeekReader.java:229) 
    at org.apache.jena.atlas.io.PeekReader.peekChar(PeekReader.java:151) 
    at org.apache.jena.atlas.io.PeekReader.makeUTF8(PeekReader.java:92) 
    at org.apache.jena.riot.tokens.TokenizerFactory.makeTokenizerUTF8(TokenizerFactory.java:48) 
    at org.apache.jena.riot.lang.RiotParsers.createParser(RiotParsers.java:57) 
    at org.apache.jena.riot.RDFParserRegistry$ReaderRIOTLang.read(RDFParserRegistry.java:198) 
    at org.apache.jena.riot.RDFParser.read(RDFParser.java:298) 
    at org.apache.jena.riot.RDFParser.parseNotUri(RDFParser.java:288) 
    at org.apache.jena.riot.RDFParser.parse(RDFParser.java:237) 
    at org.apache.jena.riot.RDFParserBuilder.parse(RDFParserBuilder.java:417) 
    at org.apache.jena.riot.RDFDataMgr.parseFromInputStream(RDFDataMgr.java:870) 
    at org.apache.jena.riot.RDFDataMgr.read(RDFDataMgr.java:268) 
    at org.apache.jena.riot.RDFDataMgr.read(RDFDataMgr.java:254) 
    at org.apache.jena.riot.adapters.RDFReaderRIOT.read(RDFReaderRIOT.java:69) 
    at org.apache.jena.rdf.model.impl.ModelCom.read(ModelCom.java:305) 

そして、ここであなたが(最後に)ゴミを見ることができます:

<http://example.com/typeofrepresentative/08> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://www.w3.org/2002/07/owl#NamedIndividual> . ������** �����I��.�������������u������� 

パイプライン:

val one = p.apply(TextIO.read().from(config.getString("source.one"))) 
      .apply(Combine.globally(SingleValue())) 
      .apply(ParDo.of(ConvertToRDFModel(RDFLanguages.NTRIPLES))) 

val two = p.apply(TextIO.read().from(config.getString("source.two"))) 
      .apply(Combine.globally(SingleValue())) 
      .apply(ParDo.of(ConvertToRDFModel(RDFLanguages.NTRIPLES))) 

val three = p.apply(TextIO.read().from(config.getString("source.three"))) 
      .apply(Combine.globally(SingleValue())) 
      .apply(ParDo.of(ConvertToRDFModel(RDFLanguages.NTRIPLES))) 

val sideInput = PCollectionList.of(one).and(two).and(three) 
       .apply(Flatten.pCollections()) 
       .apply(View.asList()) 

p.apply(RDFIO.Read 
        .from(options.getSource()) 
        .withSuffix(RDFLanguages.strLangNTriples)) 
.apply(ParDo.of(SparqlConstructETL(config, sideInput)) 
         .withSideInputs(sideInput)) 
.apply(RDFIO.Write 
        .to(options.getDestination()) 
        .withSuffix(RDFLanguages.NTRIPLES)) 

ここで全体の画像を提供するのは、SingleValueと012の実装ですParDos:

class SingleValue : SerializableFunction<Iterable<String>, String> { 
    override fun apply(input: Iterable<String>?): String { 
     if (input != null) { 
      return input.joinToString(separator = " ") 
     } 
     return "" 
    } 
} 

class ConvertToRDFModel(outputLang: Lang) : DoFn<String, Model>() { 
    private val lang: String = outputLang.name 

    @ProcessElement 
    fun processElement(c: ProcessContext?) { 
     if (c != null) { 
      val model = ModelFactory.createDefaultModel() 
      model.read(StringReader(c.element()), null, lang) 
      c.output(model) 
     } 
    } 
} 

RDFModelCoderの実装:

class RDFModelCoder(private val decodeLang: String = RDFLanguages.strLangNTriples, 
        private val encodeLang: String = RDFLanguages.strLangNTriples) 
    : AtomicCoder<Model>() { 

    private val LOG = LoggerFactory.getLogger(RDFModelCoder::class.java) 

    override fun decode(inStream: InputStream): Model { 
     val bytes = StreamUtils.getBytes(inStream) 
     val model = ModelFactory.createDefaultModel() 

     model.read(ByteArrayInputStream(bytes), null, decodeLang) // the exception is thrown from here 

     return model 
    } 

    override fun encode(value: Model, outStream: OutputStream?) { 
     value.write(outStream, encodeLang, null) 
    } 

} 

私は側の入力が複数回ファイルにチェックは、彼らがいいですよ、彼らはUTF-8エンコーディングを持っています。

+0

これはJenaの問題ではありません。入力が有効なUTF-8ではありません。 PeekReaderは、文字からバイトへの変換を改善するために、かなり大きなバッファ(128Kバイト)をスラップします。したがってMalformedInputException。長さ= 1は、そのブロック内で1バイトのUTF-8シーケンスが不正であることを意味します。 – AndyS

答えて

3

ほとんどの場合、エラーはRDFModelCoderの実装にあります。 encode/decodeを実装する場合、提供されたInputStreamOutputStreamは、現在のエンコード/デコードされているインスタンスによって排他的に所有されているわけではないことに注意してください。例えば。現在のModelのコード化された形式の後に、より多くのデータがInputStreamに存在する可能性があります。 StreamUtils.getBytes(inStream)を使用している場合は、現在エンコードされているModelのデータとストリームにあったデータの両方を取得しています。

通常、新しいCoderを書き込むときは、ストリームを手で解析するのではなく、既存のCoderを組み合わせることをお勧めします。エラーが起こりにくいです。モデルをbyte[]に変換し、ByteArrayCoder.of()を使用してエンコード/デコードすることをお勧めします。

+0

残念ながら、Modelは簡単にシリアライズできないので、Java SerializationメカニズムまたはKryoを使用してシリアライズすることはできません。そのため、シリアライザをテキスト形式に書き込んで読み戻す必要があります。私はこの作業にKryoを使用しようとしましたが、成功しませんでした。 –

+0

入力ストリームから必要以上のものを読み取らないようにするにはどうすればよいですか? –

+0

それは助けたように見えます、私はまだテストしていますが、最初のテストは合格しました!私はStringUtf8Coderを見て、それらが 'VarInt.encode'と' VarInt.decode'を呼び出して、ストリーム内のチャンクの長さを読み書きすることを発見しました。なぜ私は以前にそれを見つけられなかったのかわかりません。ドキュメンテーションがそれほど明確ではないからです... –

1

Apache Jenaは、BeamがHadoop InputFormat IOをサポートしているので、Hadoop IOをサポートするモジュールであるElephas IOを提供しています。これを使用してNTriplesファイルを読み取ることができます。

NTriples support in Elephasは(実際にはそれがすべてでModelを使用しません)IOを並列化し、メモリにモデル全体をキャッシュ避けることができるので、これは可能性がはるかに効率的になります。もちろん

Configuration myHadoopConfiguration = new Configuration(false); 

// Set Hadoop InputFormat, key and value class in configuration 
myHadoopConfiguration.setClass("mapreduce.job.inputformat.class", 
           NTriplesInputFormat.class, InputFormat.class); 
myHadoopConfiguration.setClass("key.class", LongWritable.class, Object.class); 
myHadoopConfiguration.setClass("value.class", TripleWritable.class, Object.class); 
// Set any other Hadoop config you might need 

// Read data only with Hadoop configuration. 
p.apply("read", 
     HadoopInputFormatIO.<LongWritable, TripleWritable>read() 
     .withConfiguration(myHadoopConfiguration); 

これにより、全体のパイプラインを幾分リファクタリングする必要が生じる場合があります。

+0

あなたの答えをありがとう! –

関連する問題