GFのRDF(N-Triplesではテキストファイルなので)を読み込み、何らかの形で変換するデータフローパイプライン(SDK 2.1.0、Apache Beam 2.2.0)があります。それをGCSに書き戻しますが、別のバケットに書き出します。このパイプラインでは、3つの単一ファイル(1つの入力につき1つのファイル)であるサイド入力を使用し、それらをParDoで使用します。SideInputsがDataFlowのパイプラインのデータを壊します
JavaでRDFを使用するにはApache Jenaを使用します。そのため、各ファイルはModelクラスのインスタンスに読み込まれます。 DataFlowにはCoderがありませんので、私自身で開発しました(RDFModelCoder
、下記参照)。それは私が作成した他のパイプラインの数でうまく動作します。
問題この特定のパイプラインを使用すると、サイド入力を追加すると、実行が失敗し、データの破損を示す例外が発生します。つまり、ガベージが追加されます。サイド入力を削除すると、パイプラインの実行は正常に終了します。
例外(それはRDFModelCoder
からスローされます、以下を参照してください):
Caused by: org.apache.jena.atlas.RuntimeIOException: java.nio.charset.MalformedInputException: Input length = 1
at org.apache.jena.atlas.io.IO.exception(IO.java:233)
at org.apache.jena.atlas.io.CharStreamBuffered$SourceReader.fill(CharStreamBuffered.java:77)
at org.apache.jena.atlas.io.CharStreamBuffered.fillArray(CharStreamBuffered.java:154)
at org.apache.jena.atlas.io.CharStreamBuffered.advance(CharStreamBuffered.java:137)
at org.apache.jena.atlas.io.PeekReader.advanceAndSet(PeekReader.java:235)
at org.apache.jena.atlas.io.PeekReader.init(PeekReader.java:229)
at org.apache.jena.atlas.io.PeekReader.peekChar(PeekReader.java:151)
at org.apache.jena.atlas.io.PeekReader.makeUTF8(PeekReader.java:92)
at org.apache.jena.riot.tokens.TokenizerFactory.makeTokenizerUTF8(TokenizerFactory.java:48)
at org.apache.jena.riot.lang.RiotParsers.createParser(RiotParsers.java:57)
at org.apache.jena.riot.RDFParserRegistry$ReaderRIOTLang.read(RDFParserRegistry.java:198)
at org.apache.jena.riot.RDFParser.read(RDFParser.java:298)
at org.apache.jena.riot.RDFParser.parseNotUri(RDFParser.java:288)
at org.apache.jena.riot.RDFParser.parse(RDFParser.java:237)
at org.apache.jena.riot.RDFParserBuilder.parse(RDFParserBuilder.java:417)
at org.apache.jena.riot.RDFDataMgr.parseFromInputStream(RDFDataMgr.java:870)
at org.apache.jena.riot.RDFDataMgr.read(RDFDataMgr.java:268)
at org.apache.jena.riot.RDFDataMgr.read(RDFDataMgr.java:254)
at org.apache.jena.riot.adapters.RDFReaderRIOT.read(RDFReaderRIOT.java:69)
at org.apache.jena.rdf.model.impl.ModelCom.read(ModelCom.java:305)
そして、ここであなたが(最後に)ゴミを見ることができます:
<http://example.com/typeofrepresentative/08> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://www.w3.org/2002/07/owl#NamedIndividual> . ������** �����I��.�������������u�������
パイプライン:
val one = p.apply(TextIO.read().from(config.getString("source.one")))
.apply(Combine.globally(SingleValue()))
.apply(ParDo.of(ConvertToRDFModel(RDFLanguages.NTRIPLES)))
val two = p.apply(TextIO.read().from(config.getString("source.two")))
.apply(Combine.globally(SingleValue()))
.apply(ParDo.of(ConvertToRDFModel(RDFLanguages.NTRIPLES)))
val three = p.apply(TextIO.read().from(config.getString("source.three")))
.apply(Combine.globally(SingleValue()))
.apply(ParDo.of(ConvertToRDFModel(RDFLanguages.NTRIPLES)))
val sideInput = PCollectionList.of(one).and(two).and(three)
.apply(Flatten.pCollections())
.apply(View.asList())
p.apply(RDFIO.Read
.from(options.getSource())
.withSuffix(RDFLanguages.strLangNTriples))
.apply(ParDo.of(SparqlConstructETL(config, sideInput))
.withSideInputs(sideInput))
.apply(RDFIO.Write
.to(options.getDestination())
.withSuffix(RDFLanguages.NTRIPLES))
ここで全体の画像を提供するのは、SingleValue
と012の実装ですParDos:
class SingleValue : SerializableFunction<Iterable<String>, String> {
override fun apply(input: Iterable<String>?): String {
if (input != null) {
return input.joinToString(separator = " ")
}
return ""
}
}
class ConvertToRDFModel(outputLang: Lang) : DoFn<String, Model>() {
private val lang: String = outputLang.name
@ProcessElement
fun processElement(c: ProcessContext?) {
if (c != null) {
val model = ModelFactory.createDefaultModel()
model.read(StringReader(c.element()), null, lang)
c.output(model)
}
}
}
RDFModelCoder
の実装:
class RDFModelCoder(private val decodeLang: String = RDFLanguages.strLangNTriples,
private val encodeLang: String = RDFLanguages.strLangNTriples)
: AtomicCoder<Model>() {
private val LOG = LoggerFactory.getLogger(RDFModelCoder::class.java)
override fun decode(inStream: InputStream): Model {
val bytes = StreamUtils.getBytes(inStream)
val model = ModelFactory.createDefaultModel()
model.read(ByteArrayInputStream(bytes), null, decodeLang) // the exception is thrown from here
return model
}
override fun encode(value: Model, outStream: OutputStream?) {
value.write(outStream, encodeLang, null)
}
}
私は側の入力が複数回ファイルにチェックは、彼らがいいですよ、彼らはUTF-8エンコーディングを持っています。
これはJenaの問題ではありません。入力が有効なUTF-8ではありません。 PeekReaderは、文字からバイトへの変換を改善するために、かなり大きなバッファ(128Kバイト)をスラップします。したがってMalformedInputException。長さ= 1は、そのブロック内で1バイトのUTF-8シーケンスが不正であることを意味します。 – AndyS