2016-07-05 8 views
0

パーティショニングにSpringバッチステップを設定しようとしています。良いサンプルは、hereは "id range"についてのパーティションを示していますが、 "データページ"の範囲でどこから始めるべきか分かりません。私のシーケンシャルステップSpringバッチステップPagingAndSortingRepositoryを使用したパーティショニング

、私は:

  • リーダー:データ変換
  • ライタ:5
  • :CrudRepository
  • chunckを用いRepositoryItemWriter PagingAndSortingRepository
  • プロセッサを使用してRepositoryItemReaderを
  • リスナー:aステップリスナー
私が理解してきたよう

return stepBuilderFactory.get("stepApplicationForm") .<OldApplicationForm, NewApplicationForm>chunk(5) .reader(reader).processor(processor).writer(writer) .listener(listener).build();

、partitionningのために、私は、その後、私は子供のステップでパーティショナを使用するように指示する「親」のステップを持って、パーティを作成するために、リーダーとし、「子」のステップを持っています「ページ区切り」パラメータを認識します。

TaskExecutorについては、私はThreadPoolTaskExecutorが適合すると思います。

データ "ページ"に基づいてパーティショニングを実装または設定するにはどうすればよいですか?そして、私がチェックしなければならないスレッディング・キャビットは何ですか?

感謝:)

答えて

1

各パーティションは、独自のアイテムリーダーとアイテムライターインスタンスがあります。パーティション実装では、データロードの最小値を見つけることができます。独自のロジックを使用すると、実行コンテキストで最小値と最大値を作成できます。データベースにクエリを実行する際に、これらを使用して特定のデータスライスを処理して、同時性の問題が発生しないようにすることができます。

@Bean 
public Step myMasterStep() { 
    return stepBuilderFactory.get("myMasterStep") 
      .partitioner("mySlaveWorker", myPartitioner()) 
      .partitionHandler(myPartitionHandler()).build(); 
} 


@Bean 
    public Step mySlaveWorker() { 
     return stepBuilderFactory 
       .get("mySlaveWorker") 
       .<OldApplicationForm, NewApplicationForm> chunk(5) 
       .faultTolerant() 
       .listener(MyStepListener()) 
       .skip(DataAccessException.class) 
       .skip(FatalStepExecutionException.class) 
       .skip(Exception.class) 
       .skipLimit(75) 
       .noRollback(DataAccessException.class) 
       .noRollback(FatalStepExecutionException.class) 
       .noRollback(Exception.class) 
       .reader(myDataItemReader()) 
       .writer(myDataItemWriter()).build(); 
    } 

@Bean 
@StepScope 
public MyDataItemReader myDataItemReader(
     @Value("#{stepExecutionContext[minId]}") Long minId, 
     @Value("#{stepExecutionContext[maxId]}") Long maxId) { 
    MyDataItemReader myDataItemReader = new MyDataItemReader(); 
    myDataItemReader.setPageSize(100); 
    myDataItemReader.setMinId(minId); 
    myDataItemReader.setMaxId(maxId); 
    return myDataItemReader; 
} 

@Bean 
@StepScope 
public MyDataItemWriter myDataItemWriter() { 
    return new MyDataItemWriter(); 
} 

@Bean 
@StepScope 
public MyPartitioner myPartitioner() { 
    MyPartitioner myPartitioner = new MyPartitioner(); 
    myPartitioner.setDataSource(dataSource); 
return myPartitioner; 
} 

public class MyStepListener implements SkipListener<OldApplicationForm, NewApplicationForm> { 

private static final Logger LOGGER = LoggerFactory.getLogger(MyStepListener.class); 


public void onSkipInProcess(OldApplicationForm item, Throwable t) { 
    LOGGER.error("onSkipInProcess" + t.getMessage()); 
} 

public void onSkipInRead(Throwable t) { 
    LOGGER.error("onSkipInRead " + t.getMessage()); 
} 


public void onSkipInWrite(NewApplicationForm item, Throwable t) { 
    //logs 
    LOGGER.error("In MyStepListener --> onSkipInProcess" + t.getMessage()); 
} 

} 
+0

上記のコードを使用すると、スキップメカニズムも使用できます。これは特に、不完全なレコードのためにバッチを失敗させたくない場合に便利です。 –

関連する問題