2016-12-08 11 views
2

編集:解決済み!Apache Beam取得元ファイル名

複数の言語のテキストファイルがあります。 Apache Beamを使って各行に言語タグを追加したいと思います。

例:

ファイルtext_enThis is a sentence.

ファイルtext_deDies ist ein Satz.

私がしたいことはこれです:

en: This is a sentence. 
de: Dies ist ein Satz. 

私が試したもの:私が最初にちょうど1 TextIO.Read.From(dataSetDirectory+"/*")を使用して.getSource()のようになりますオプションを探してみました

。しかし、これは存在しないようです。しかし、私のDoFn LanguageTagAdderのみ第一言語で初期化され、したがって、すべてのファイルが完全に罰金持った作品をこのようにファイルの読み込み

File[] files = new File(datasetDirectory).listFiles(); 
PCollectionList<String> dataSet=null; 
for (File f: files) { 
    String language = f.getName(); 
    logger.debug(language); 
    PCollection<String> newPCollection = p.apply(
      TextIO.Read.from(f.getAbsolutePath())) 
       .apply(ParDo.of(new LanguageTagAdder(language))); 

    if (dataSet==null) { 
     dataSet=PCollectionList.of(newPCollection); 
    } else { 
     dataSet.and(newPCollection); 
    } 
} 
PCollection<String> completeDataset= dataSet.apply(Flatten.<String>pCollections()) 

次は、私はこのようなもので、すべてのファイル1を読み込もうとしましたが同じ追加言語。

LanguageTagAdderは次のようになります。

public class LanguageTagAdder 
      extends DoFn<String,String> { 

     private String language; 
     public LanguageTagAdder(String language) { 
      this.language=language; 
     } 
     @ProcessElement 
     public void processElement(ProcessContext c) { 
      c.output(language+c.element()); 
     } 
    } 

私は、データをparrallelizedことができるように、この動作が意図して必要とされている実感が、どのように私は私の問題を解決するに行きますか? - それを解決する方法はありますか?

PS:(第二言語で)二度目のnew LanguageTagAdderを作成するときに、私は次の警告を得る:

DEBUG 2016-12-05 17:09:55,070 [main] de.kdld16.hpi.FusionDataset - en 
DEBUG 2016-12-05 17:09:56,216 [main] de.kdld16.hpi.FusionDataset - de 
WARN 2016-12-05 17:09:56,219 [main] org.apache.beam.sdk.Pipeline - Transform TextIO.Read2 does not have a stable unique name. This will prevent updating of pipelines. 

EDIT: を問題が

dataSet.and(newPCollection);

ラインでした

次のように書き直す必要がありました。

dataSet=dataSet.and(newPCollection);

データセットには最初のファイルしか含まれていませんでした....彼らはすべて同じ言語のタグを持っていたのでしょうか?

dataSet=dataSet.and(newPCollection); 

それがあった方法は、データセットは最初のファイルのみが含まれていました。

+0

あなたの例は、「次のようにすべてのファイルを1つずつ読み込もうとしました:」と動作するはずです。これは現在、あなたが望むものを達成するための最も簡単な方法です。 LanguageTagAdderが最初の言語のみで初期化されていることをどのようにして見つけたかについて詳しく説明できますか?あなたのコードは、LanguageTagAdderの異なるインスタンスを作成しています。Beamはそのまま使用する必要があります。実際には起こっていないことですか? – jkff

+0

また、どのランナーを使用していますか? DirectRunnerでない場合:DirectRunnerで問題が再現されますか? – jkff

+0

私はDirectRunnerを使用しています。多くのLanguageTagAddersは(多くの異なる言語で)初期化されています。コンストラクタにブレークポイントを置くと、そのことがわかります。ただし、すべてのテキストファイルのすべての行には、同じ言語のタグ「en」しかありません。 '' ' はい、これはまさに私が言っていることです。 – JoSauderGH

答えて

2

問題は、それがのように書き換えることするために必要なライン

dataSet.and(newPCollection); 

ました。

関連する問題