2016-05-25 13 views
3

データフローとJavaをクラウドするのが初めてのので、これが正しい質問であると思っています。クラウドデータフローのETLと解析のCSVファイル

文字列、整数、タイムスタンプなどのn個の列と行を持つcsvファイルがあります。列ごとに新しいPCollectionを作成する必要がありますか?

私は例で見つけた文書のほとんどは、のようなもののラインに沿っている:

PCollection<String> data = p.apply(TextIO.Read.from("gs://abc/def.csv")); 

しかし、私には、それは文字列として全体のCSVファイルをインポートしても意味がありません。私はここで何が欠けていますか?私は自分のPCコレクションをどうやってセットアップするべきですか?

答えて

2

この例では、ファイルに1行あたり1 Stringを含むコレクションを作成します。ファイルがある場合:

Alex,28,111-222-3344 
Sam,30,555-666-7788 
Drew,19,123-45-6789 

、コレクションは、論理的に"Alex,28,111-222-3344""Sam,30,555-666-7788"、および"Drew,19,123-45-6789"が含まれています。あなたは、例えば、変換ParDoまたはMapElementsを通じてコレクションをパイプによって、Javaでさらに解析コードを適用することができます。

class User { 
    public String name; 
    public int age; 
    public String phone; 
} 

PCollection<String> lines = p.apply(TextIO.Read.from("gs://abc/def.csv")); 
PCollection<User> users = lines.apply(MapElements.via((String line) -> { 
    User user = new User(); 
    String[] parts = line.split(","); 
    user.name = parts[0]; 
    user.age = Integer.parseInt(parts[1]); 
    user.phone = parts[2]; 
    return user; 
}).withOutputType(new TypeDescriptor<User>() {});) 
+0

が@jkffありがとう、私は今それを見てみましょう。 – chipoglesby

+1

私はString [] parts = line.split( "、(?=([^ \"] * \ "[^ \"] * \ ")* [[^ \"] * $) –

+0

予期しない列順のCSVを処理するためのより堅牢な方法はありますか? –

1
line.split(","); 

のstring.Splitは意味がありません。もし、このようなラインデータ:

、B、Cは、 "我々は、文字列にカンマが含まれている必要があり"、CSVデータに対処するためのD、E

財産の方法は、CSV形式のライブラリをインポートすることです:

 <dependency> 
      <groupId>com.opencsv</groupId> 
      <artifactId>opencsv</artifactId> 
      <version>3.7</version> 
     </dependency> 

とパルド内部の下の使用コード:

public void processElement(ProcessContext c) throws IOException { 
    String line = c.element(); 
    CSVParser csvParser = new CSVParser(); 
    String[] parts = csvParser.parseLine(line); 
}