2017-02-21 6 views
0

JDBC Beanを使用してMySQLソースから読み込み、それをそのままBigQueryテーブルに書き込むことになっているApache Beamタスクがあります。この時点で変換は実行されません。これは、データベース出力をBigQueryに直接書き込む場合には、後で表示されます。MySQLを入力ソースとして使用してGoogle BigQueryに書き込む

これは、この操作を実行しようとしている主な方法です。

public static void main(String[] args) { 
     Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); 

     Pipeline p = Pipeline.create(options); 

     // Build the table schema for the output table. 
     List<TableFieldSchema> fields = new ArrayList<>(); 
     fields.add(new TableFieldSchema().setName("phone").setType("STRING")); 
     fields.add(new TableFieldSchema().setName("url").setType("STRING")); 
     TableSchema schema = new TableSchema().setFields(fields); 

     p.apply(JdbcIO.<KV<String, String>>read() 
     .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
      "com.mysql.jdbc.Driver", "jdbc:mysql://host:3306/db_name") 
      .withUsername("user") 
      .withPassword("pass")) 
      .withQuery("SELECT phone_number, identity_profile_image FROM scraper_caller_identities LIMIT 100") 
      .withRowMapper(new JdbcIO.RowMapper<KV<String, String>>() { 
       public KV<String, String> mapRow(ResultSet resultSet) throws Exception { 
       return KV.of(resultSet.getString(1), resultSet.getString(2)); 
      } 
      }) 
     .apply(BigQueryIO.Write 
      .to(options.getOutput()) 
      .withSchema(schema) 
      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) 
      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE))); 

     p.run(); 
    } 

しかし、私はMavenを使用してテンプレートを実行したときに、私は次のエラーを取得する:

Test.java:[184,6] cannot find symbol symbol: method apply(com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.Bound)
location: class org.apache.beam.sdk.io.jdbc.JdbcIO.Read<com.google.cloud.dataflow.sdk.values.KV<java.lang.String,java.lang.String>>

私はと思われますBigQueryIOを渡してはいけません。予想されるデータ収集を行い、それは私が現時点で苦労していることです。

この場合、MySQLからのデータをBigQueryの期待通りにする方法を教えてください。

答えて

1

私はあなたの代わりにPCollection < KV <文字列、RowMapperのが出力している文字列> >タイプのBigQueryIO.WriteへのTableRow > PCollection <を提供する必要があると思います。

また、TableRowを設定するときは、正しい列名と値のペアを使用してください。 注:あなたのKVは、列名と値のペアではなく、電話番号とURL値です(例:{"555-555-1234": "http://www.url.com"}、{"phone": "555-555- 1234" 、 『URL』:「http://www.url.comは、」})

ここでは例を参照してください。 https://beam.apache.org/documentation/sdks/javadoc/0.5.0/

あなたはこの試みを与えると、それはあなたのために働くなら、私に教えてくださいますか?お役に立てれば。

関連する問題