編集:解決済み!Apache Beam取得元ファイル名
複数の言語のテキストファイルがあります。 Apache Beamを使って各行に言語タグを追加したいと思います。
例:
ファイルtext_en: This is a sentence.
ファイルtext_de:Dies ist ein Satz.
私がしたいことはこれです:
en: This is a sentence. de: Dies ist ein Satz.
私が試したもの:私が最初にちょうど1 TextIO.Read.From(dataSetDirectory+"/*")
を使用して.getSource()
のようになりますオプションを探してみました
。しかし、これは存在しないようです。しかし、私のDoFn LanguageTagAdder
のみ第一言語で初期化され、したがって、すべてのファイルが完全に罰金持った作品をこのようにファイルの読み込み
File[] files = new File(datasetDirectory).listFiles();
PCollectionList<String> dataSet=null;
for (File f: files) {
String language = f.getName();
logger.debug(language);
PCollection<String> newPCollection = p.apply(
TextIO.Read.from(f.getAbsolutePath()))
.apply(ParDo.of(new LanguageTagAdder(language)));
if (dataSet==null) {
dataSet=PCollectionList.of(newPCollection);
} else {
dataSet.and(newPCollection);
}
}
PCollection<String> completeDataset= dataSet.apply(Flatten.<String>pCollections())
:
次は、私はこのようなもので、すべてのファイル1を読み込もうとしましたが同じ追加言語。
LanguageTagAdder
は次のようになります。
public class LanguageTagAdder
extends DoFn<String,String> {
private String language;
public LanguageTagAdder(String language) {
this.language=language;
}
@ProcessElement
public void processElement(ProcessContext c) {
c.output(language+c.element());
}
}
私は、データをparrallelizedことができるように、この動作が意図して必要とされている実感が、どのように私は私の問題を解決するに行きますか? 梁 - それを解決する方法はありますか?
PS:(第二言語で)二度目のnew LanguageTagAdder
を作成するときに、私は次の警告を得る:
DEBUG 2016-12-05 17:09:55,070 [main] de.kdld16.hpi.FusionDataset - en
DEBUG 2016-12-05 17:09:56,216 [main] de.kdld16.hpi.FusionDataset - de
WARN 2016-12-05 17:09:56,219 [main] org.apache.beam.sdk.Pipeline - Transform TextIO.Read2 does not have a stable unique name. This will prevent updating of pipelines.
EDIT: を問題が
dataSet.and(newPCollection);
次のように書き直す必要がありました。
dataSet=dataSet.and(newPCollection);
データセットには最初のファイルしか含まれていませんでした....彼らはすべて同じ言語のタグを持っていたのでしょうか?
dataSet=dataSet.and(newPCollection);
それがあった方法は、データセットは最初のファイルのみが含まれていました。
あなたの例は、「次のようにすべてのファイルを1つずつ読み込もうとしました:」と動作するはずです。これは現在、あなたが望むものを達成するための最も簡単な方法です。 LanguageTagAdderが最初の言語のみで初期化されていることをどのようにして見つけたかについて詳しく説明できますか?あなたのコードは、LanguageTagAdderの異なるインスタンスを作成しています。Beamはそのまま使用する必要があります。実際には起こっていないことですか? – jkff
また、どのランナーを使用していますか? DirectRunnerでない場合:DirectRunnerで問題が再現されますか? – jkff
私はDirectRunnerを使用しています。多くのLanguageTagAddersは(多くの異なる言語で)初期化されています。コンストラクタにブレークポイントを置くと、そのことがわかります。ただし、すべてのテキストファイルのすべての行には、同じ言語のタグ「en」しかありません。 '' ' はい、これはまさに私が言っていることです。 – JoSauderGH