2016-12-23 8 views
3

私はCSVファイルを持っていますが、あらかじめ列名がわかりません。 Google Dataflowでいくつかの変換を行った後、JSONでデータを出力する必要があります。データフローでCSVヘッダーを読み取る

ヘッダー行を取得し、すべての行にラベルを浸透させるにはどうすればよいですか?例えば

a,b,c 
1,2,3 
4,5,6 

は...(約)は以下のようになる。

{a:1, b:2, c:3} 
{a:4, b:5, c:6} 
+0

は、あなたは、JavaやPythonでそれが必要なのでしょうか? – vdolez

答えて

5

あなたが最初の行とストアヘッダデータを読み込みます(TextIO.TextSourceに類似)カスタムFileBasedSourceを実装する必要があります

@Override 
    protected void startReading(final ReadableByteChannel channel) 
    throws IOException { 
     lineReader = new LineReader(channel); 

     if (lineReader.readNextLine()) { 
      final String headerLine = lineReader.getCurrent().trim(); 
      header = headerLine.split(","); 
      readingStarted = true; 
     } 
    } 

および後者では、現在の行データへのT:私は速い(完全な)ソリューションを実装しました

@Override 
    protected boolean readNextRecord() throws IOException { 
     if (!lineReader.readNextLine()) { 
      return false; 
     } 

     final String line = lineReader.getCurrent(); 
     final String[] data = line.split(","); 

     // assumes all lines are valid 
     final StringBuilder record = new StringBuilder(); 
     for (int i = 0; i < header.length; i++) { 
      record.append(header[i]).append(":").append(data[i]).append(", "); 
     } 

     currentRecord = record.toString(); 
     return true; 
    } 

github上で利用できます。私はまた、実証するデータフローユニットテストを追加した読書:

@Test 
public void test_reading() throws Exception { 
    final File file = 
      new File(getClass().getResource("/sample.csv").toURI()); 
    assertThat(file.exists()).isTrue(); 

    final Pipeline pipeline = TestPipeline.create(); 

    final PCollection<String> output = 
      pipeline.apply(Read.from(CsvWithHeaderFileSource.from(file.getAbsolutePath()))); 

    DataflowAssert 
      .that(output) 
      .containsInAnyOrder("a:1, b:2, c:3, ", "a:4, b:5, c:6, "); 

    pipeline.run(); 
} 

どこsample.csv内容以下があります。

a,b,c 
1,2,3 
4,5,6 
+0

それはまだ新しいapacheビームバージョンと互換性がありますか? – vdolez

関連する問題