2016-11-19 18 views
0

非常に包括的な例のVictor Jabor blogに従ってもこれを動作させることができませんでした。私は彼がすべての最新の依存関係を記述して使用しているので、彼の構成に従ってきました。私はVictorが1つのデータベースから読み込み、別のデータベースに書き込もうとしているので、私はパーティション化せずにこの作業をしていますが、5分以内に5〜1,000万行を読み込む必要があるため、パフォーマンスを向上させるためにパーティション化が必要です。スプリングブートバッチ分割JdbcCursorItemReaderエラー

以下が動作するように思われる: 1)ColumnRangePartitioner 2)TaskExecutorPartitionHandlerはグリッド寸法に基づいて、ステップタスクの正確な数を作成し、スレッド 3の正確な数を生成します)ColumnRangePartitionerによって設定stepExecutionからsetPreparedStatementSetter。

しかし、アプリケーションを実行すると、JdbcCursorItemReaderから一貫性がなく、理解できないエラーが発生します。最後の手段としてJdbcCursorItemReaderをデバッグする必要があります。私はこれの前にいくつかの助けを得ることを望んでいるとうまくいけば、構成の問題になります。

ERROR: Caused by: java.sql.SQLException: Exhausted Resultset at oracle.jdbc.driver.OracleResultSetImpl.getInt(OracleResultSetImpl.java:901) ~[ojdbc6-11.2.0.2.0.jar:11.2.0.2.0] at org.springframework.jdbc.support.JdbcUtils.getResultSetValue(JdbcUtils.java:160) ~[spring-jdbc-4.3.4.RELEASE.jar:4.3.4.RELEASE] at org.springframework.jdbc.core.BeanPropertyRowMapper.getColumnValue(BeanPropertyRowMapper.java:370) ~[spring-jdbc-4.3.4.RELEASE.jar:4.3.4.RELEASE] at org.springframework.jdbc.core.BeanPropertyRowMapper.mapRow(BeanPropertyRowMapper.java:291) ~[spring-jdbc-4.3.4.RELEASE.jar:4.3.4.RELEASE] at org.springframework.batch.item.database.JdbcCursorItemReader.readCursor(JdbcCursorItemReader.java:139) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE]

設定クラス:

@Configuration @EnableBatchProcessing public class BatchConfiguration { 

    @Bean 
    public ItemProcessor<Archive, Archive> processor(@Value("${etl.region}") String region) { 
     return new ArchiveProcessor(region); 
    } 

    @Bean 
    public ItemWriter<Archive> writer(@Qualifier(value = "postgres") DataSource dataSource) { 
     JdbcBatchItemWriter<Archive> writer = new JdbcBatchItemWriter<>(); 

     writer.setSql("insert into tdw_src.archive (id) " + 
       "values (:id)"); 
     writer.setDataSource(dataSource); 
     writer.setItemSqlParameterSourceProvider(new org.springframework.batch.item.database. 
       BeanPropertyItemSqlParameterSourceProvider<>()); 
     return writer; 
    } 

    @Bean 
    public Partitioner archivePartitioner(@Qualifier(value = "gmDataSource") DataSource dataSource, 
              @Value("ROWNUM") String column, 
              @Value("archive") String table, 
              @Value("${gm.datasource.username}") String schema) { 
     return new ColumnRangePartitioner(dataSource, column, schema + "." + table); 
    } 

    @Bean 
    public Job archiveJob(JobBuilderFactory jobs, Step partitionerStep, JobExecutionListener listener) { 
     return jobs.get("archiveJob") 
       .preventRestart() 
       .incrementer(new RunIdIncrementer()) 
       .listener(listener) 
       .start(partitionerStep) 
       .build(); 
    } 

    @Bean 
    public Step partitionerStep(StepBuilderFactory stepBuilderFactory, 
           Partitioner archivePartitioner, 
           Step step1, 
           @Value("${spring.batch.gridsize}") int gridSize) { 
     return stepBuilderFactory.get("partitionerStep") 
       .partitioner(step1) 
       .partitioner("step1", archivePartitioner) 
       .gridSize(gridSize) 
       .taskExecutor(taskExecutor()) 
       .build(); 
    } 

    @Bean(name = "step1") 
    public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Archive> customReader, 
         ItemWriter<Archive> writer, ItemProcessor<Archive, Archive> processor) { 
     return stepBuilderFactory.get("step1") 
       .listener(customReader) 
       .<Archive, Archive>chunk(5) 
       .reader(customReader) 
       .processor(processor) 
       .writer(writer) 
       .build(); 
    } 

    @Bean 
    public TaskExecutor taskExecutor(){ 
     return new SimpleAsyncTaskExecutor(); 
    } 

    @Bean 
    public SimpleJobLauncher getJobLauncher(JobRepository jobRepository) { 
     SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); 
     jobLauncher.setJobRepository(jobRepository); 
     return jobLauncher; 
    } 

Custom Reader:- 

public class CustomReader extends JdbcCursorItemReader<Archive> implements StepExecutionListener { 

    private StepExecution stepExecution; 

    @Autowired 
    public CustomReader(@Qualifier(value = "gmDataSource") DataSource geomangerDataSource, 
         @Value("${gm.datasource.username}") String schema) throws Exception { 
     super(); 
     this.setSql("SELECT TMP.* FROM (SELECT ROWNUM AS ID_PAGINATION, id FROM " + schema + ".archive) TMP " + 
       "WHERE TMP.ID_PAGINATION >= ? AND TMP.ID_PAGINATION <= ?"); 
     this.setDataSource(geomangerDataSource); 
     BeanPropertyRowMapper<Archive> rowMapper = new BeanPropertyRowMapper<>(Archive.class); 
     this.setRowMapper(rowMapper); 
     this.setFetchSize(5); 
     this.setSaveState(false); 

     this.setVerifyCursorPosition(false); 
// not sure if this is needed?  this.afterPropertiesSet(); 
    } 

    @Override 
    public synchronized void beforeStep(StepExecution stepExecution) { 
     this.stepExecution = stepExecution; 
     this.setPreparedStatementSetter(getPreparedStatementSetter()); 
    } 

    private PreparedStatementSetter getPreparedStatementSetter() { 
     ListPreparedStatementSetter listPreparedStatementSetter = new ListPreparedStatementSetter(); 
     List<Integer> list = new ArrayList<>(); 
     list.add(stepExecution.getExecutionContext().getInt("minValue")); 
     list.add(stepExecution.getExecutionContext().getInt("maxValue")); 
     listPreparedStatementSetter.setParameters(list); 
     LOGGER.debug("getPreparedStatementSetter list: " + list); 
     return listPreparedStatementSetter; 
    } 

    @Override 
    public ExitStatus afterStep(StepExecution stepExecution) { 
     return null; 
    } 
} 
+0

部品として顧客リーダーを除去し、次のようにconfigのバッチに添加:@Bean 公共ItemReader リーダー(@Qualifier(値= "gmDataSource")データソースgeomangerDataSource、 @value( "$ {} geomanager.datasource.username ")文字列スキーマ)例外をスローする{ return新しいCustomReader(geomangerDataSource、schema);ます。java.sql.SQLException:によって引き起こさ :oracle.jdbc.driver.OracleResultSetImpl.getTimestamp(OracleResultSetImpl.java:1381)で疲れ結果セット \t〜[ojdbc6-11.2.0.2まだ同じエラーを取得 } 。 0.jar:11.2.0.2.0] – user103122

答えて

0

私はこのすべての作業を持っています。

私のCustomReaderでselect文を注文する必要があったので、rownumはすべてのスレッドで同じままで、最後にステップで使用する各Beanに対して@StepScopeを使用してBeanのスコープを設定する必要がありました。

実際には私はrownumを使用していません。これは順序を決めて緩やかなパフォーマンスを低下させるため、最高のパフォーマンスを得るためにpk列を使用します。