2016-07-31 16 views
0

パーティーを使用してマルチスレッドを作成しようとしたときに読み込みが始まる前にリーダーを開く必要があります。 JdbcCursorItemReaderは多分、なぜ私はこのエラーを取得していますので、スレッドセーフされていない場合JdbcCursorItemReaderのパーティショナー - 読み込み可能にする前にリーダーを開く必要があります

org.springframework.batch.item.ReaderNotOpenException: Reader must be open before it can be read. 
    at org.springframework.batch.item.database.AbstractCursorItemReader.doRead(AbstractCursorItemReader.java:443) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:88) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_101] 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_101] 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_101] 
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_101] 
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:302) ~[spring-aop-4.2.6.RELEASE.jar:4.2.6.RELEASE] 
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) ~[spring-aop-4.2.6.RELEASE.jar:4.2.6.RELEASE] 
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) ~[spring-aop-4.2.6.RELEASE.jar:4.2.6.RELEASE] 
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:133) ~[spring-aop-4.2.6.RELEASE.jar:4.2.6.RELEASE] 
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:121) ~[spring-aop-4.2.6.RELEASE.jar:4.2.6.RELEASE] 
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) ~[spring-aop-4.2.6.RELEASE.jar:4.2.6.RELEASE] 
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:208) ~[spring-aop-4.2.6.RELEASE.jar:4.2.6.RELEASE] 
    at com.sun.proxy.$Proxy58.read(Unknown Source) ~[na:na] 
    at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:91) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at org.springframework.batch.core.step.item.SimpleChunkProvider.read(SimpleChunkProvider.java:157) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:116) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:374) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:144) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:110) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:69) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:406) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:330) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:133) ~[spring-tx-4.2.6.RELEASE.jar:4.2.6.RELEASE] 
    at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:271) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:81) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:374) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:144) ~[spring-batch-infrastructure-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:257) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:139) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:136) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_101] 
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101] 

2016-08-01 14:26:14.700 ERROR 12716 --- [   main] o.s.batch.core.step.AbstractStep   : Encountered an error executing step partitionStep in job exportMasterListCsv 

org.springframework.batch.core.JobExecutionException: Partition handler returned an unsuccessful step 
    at org.springframework.batch.core.partition.support.PartitionStep.doExecute(PartitionStep.java:112) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200) ~[spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:392) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:135) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:306) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:135) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) [spring-core-4.2.6.RELEASE.jar:4.2.6.RELEASE] 
    at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:128) [spring-batch-core-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at org.springframework.batch.test.JobLauncherTestUtils.launchJob(JobLauncherTestUtils.java:152) [spring-batch-test-3.0.7.RELEASE.jar:3.0.7.RELEASE] 
    at com.myer.pricing.onlinestore.export.job.ExportMasterListCsvTest.testLaunchJob(ExportMasterListCsvTest.java:60) [test-classes/:na] 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_101] 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_101] 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_101] 
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_101] 
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) [junit-4.11.jar:na] 
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) [junit-4.11.jar:na] 
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) [junit-4.11.jar:na] 
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) [junit-4.11.jar:na] 
    at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) [junit-4.11.jar:na] 
    at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75) [spring-test-4.2.6.RELEASE.jar:4.2.6.RELEASE] 
    at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86) [spring-test-4.2.6.RELEASE.jar:4.2.6.RELEASE] 
    at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84) [spring-test-4.2.6.RELEASE.jar:4.2.6.RELEASE] 
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) [junit-4.11.jar:na] 
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:254) [spring-test-4.2.6.RELEASE.jar:4.2.6.RELEASE] 
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:89) [spring-test-4.2.6.RELEASE.jar:4.2.6.RELEASE] 
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) [junit-4.11.jar:na] 
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) [junit-4.11.jar:na] 
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) [junit-4.11.jar:na] 
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) [junit-4.11.jar:na] 
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) [junit-4.11.jar:na] 
    at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61) [spring-test-4.2.6.RELEASE.jar:4.2.6.RELEASE] 
    at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70) [spring-test-4.2.6.RELEASE.jar:4.2.6.RELEASE] 
    at org.junit.runners.ParentRunner.run(ParentRunner.java:309) [junit-4.11.jar:na] 
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:193) [spring-test-4.2.6.RELEASE.jar:4.2.6.RELEASE] 
    at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86) [.cp/:na] 
    at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) [.cp/:na] 
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459) [.cp/:na] 
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:678) [.cp/:na] 
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382) [.cp/:na] 
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192) [.cp/:na] 

私は思っていましたか?ここで

@Configuration 
public class ExportMasterListCsvJobConfig { 

public static final String JOB_NAME = "exportMasterListCsv"; 

@Autowired 
public JobBuilderFactory jobBuilderFactory; 

@Autowired 
public StepBuilderFactory stepBuilderFactory; 

@Bean 
public TaskExecutor taskExecutor() { 
    return new SimpleAsyncTaskExecutor(); 
} 


/** 
* Create job to export the master list from the online pricing staging db. 
* 
* @param readStgDbAndExportMasterListStep read online staging db step and export to csv 
* @return job to export the master list from the online pricing staging db. 
*/ 
@Bean 
public Job exportMasterListCsvJob(@Qualifier("partitionStep") Step partitionStep) { 

    return jobBuilderFactory.get(JOB_NAME) 
      .incrementer(new RunIdIncrementer()) 
      .start(partitionStep) 
      .build(); 
} 

    @Bean 
    public Step partitionStep(@Qualifier("readStgDbAndExportMasterListStep") Step readStgDbAndExportMasterListStep){ 
    return stepBuilderFactory.get("partitionStep") 
     .partitioner(readStgDbAndExportMasterListStep) 
     .partitioner("readStgDbAndExportMasterListStep", partitioner()) 
     .taskExecutor(taskExecutor()) 
     .build(); 
    } 

/** 
* Read from online pricing staging db. 
* 
* @param chunkSize 
* @param queryOnlineStagingDbReaderForMasterList 
* @param masterListOutputProcessor 
* @param masterListFileWriter 
* @return step to read from online pricing staging db. 
*/ 
@Bean 
public Step readStgDbAndExportMasterListStep(
     @Value("${exportMasterListCsv.generateMasterListRows.chunkSize}") int chunkSize, 
     @Qualifier("queryOnlineStagingDbReaderForMasterList") ItemReader<MasterList> queryOnlineStagingDbReaderForMasterList, 
     @Qualifier("masterListOutputProcessor") ItemProcessor<MasterList,MasterList> masterListOutputProcessor, 
     @Qualifier("masterListFileWriter") ItemWriter<MasterList> masterListFileWriter) { 

    return stepBuilderFactory.get("readStgDbAndExportMasterListStep") 
       .<MasterList,MasterList>chunk(chunkSize) 
       .reader(queryOnlineStagingDbReaderForMasterList) 
       .processor(masterListOutputProcessor) 
       .writer(masterListFileWriter) 
       .build(); 

} 

/** 
* Query and map rows from online pricing staging db into a master list object. 
* 
* @param onlineStagingDb 
* @param masterListSql 
* @return 
*/ 
@Bean 
@StepScope 
public ItemReader<MasterList> queryOnlineStagingDbReaderForMasterList(
     DataSource onlineStagingDb, 
     @Value("#{stepExecutionContext[divisionId]}") Integer divisionId, 
     @Value("${exportMasterListCsv.generateMasterListRows.masterListSql}") String masterListSql) { 

    JdbcCursorItemReader<MasterList> reader = new JdbcCursorItemReader<>(); 

    reader.setDataSource(onlineStagingDb); 
    reader.setSql(masterListSql); 
    reader.setPreparedStatementSetter(new PreparedStatementSetter() { 
     public void setValues(PreparedStatement ps) throws SQLException { 
      ps.setInt(1, divisionId); 
     } 
    }); 


    reader.setRowMapper(new RowMapper<MasterList>() { 
     @Override 
     public MasterList mapRow(ResultSet resultSet, int i) throws SQLException { 
      MasterList masterList = new MasterList(); 
      masterList.setL1(resultSet.getString(DaoConstants.COLUMN_HEADER_LEVEL_ONE)); 

      return masterList; 
     } 
    });   
    return reader; 
} 

@Bean 
public Partitioner partitioner(){ 
    return new DivisionPartitioner(); 
} 

は、私はネット上でいくつかの読書を行ってきた...私は私の仕事を設定しているかおおよそです。おそらくhttps://jira.spring.io/browse/BATCH-1301に関連していますか? おかげ

+0

コードの一部のみが正しく貼り付けられました。 'queryOnlineStagingDbReaderForMasterList'は切り捨てられます。 – Brad

+0

あなたのためにもう少し私の実装を追加しました – Richie

答えて

1

JdbcCursorItemReaderは多分、なぜ私はこのエラーを取得していますので、 スレッドセーフされていない場合、私はちょうど思っていましたか?

JdbcCursorItemReaderはAbstractCursorItemReaderで、これはAbstractItemCountingItemStreamItemReaderです。

AbstractItemCountingItemStreamItemReaderのドキュメントの実装によるとのExecutionContextに 項目数を格納することで、再起動をサポートしていますItemReadersためないスレッドセーフ

http://docs.spring.io/spring-batch/apidocs/org/springframework/batch/item/support/AbstractItemCountingItemStreamItemReader.html

抽象スーパークラスは(そのために を注文するアイテムを必要としています実行の間に保存される)。 サブクラスは本質的に スレッドセーフではありません。

スレッド問題へのログのヒントのこの部分:

at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_101] 
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101] 

は一つだけのスレッドを使用するのTaskExecutorを設定することによって、それをシーケンシャルに実行してみてください。

public TaskExecutor taskExecutor() { 
    SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(); 
    taskExecutor.setConcurrencyLimit(1); 
    return taskExecutor; 
} 
+0

こんにちは、concurrencyLimitを1に設定すると、同じエラーが表示されます。しかし、スタックトレースは、これがスレッディングと関係していることを示してくれてありがとう。誰かがアイデアを持っている場合、まだこの問題の解決策を探しています。ありがとう – Richie

+0

私はこのコードのBeanスコープで遊んできました。そして、私はどこかで私を持っていると思います。私は豆をどのようにスコープするかについて異なる質問があります。私はこの質問を締め切り、スコープの質問に固有の新しい質問を始めます。助けてくれてありがとう – Richie

+1

それがあなたを助けてくれてうれしい。Beanのスコープに関しては、プロパティのバインド(いつの時点で)と同様に、http://docs.spring.io/spring-batch/reference/html/configureStep.html#stepを参照してください。 -範囲 –

0

初めて読むときは、読者が準備が整うまでに時間がかかります。時にはrowMapperが先に進まず、読者がまだ開かれていないときに読者が開​​いていないと思うことがあります。この問題を解決するには、初めて使用するときにrowMapperをスリープ状態にしてください。次のブロックをsetRowMapperに追加し、エラーがなくなるかどうかを確認します。

try { 
    if(firstTime) { 
     Thread.sleep(20000); 
     firstTime = false; 
    } 
} catch (InterruptedException e) { 
    e.printStackTrace(); 
} 
関連する問題