2016-03-28 44 views
2

私は100Kから500Kのレコードを含む大きなファイルを持っています。私はチャンク指向の処理を使用するつもりです。私の考えは春のバッチを使用して大きなファイルを処理する

です。1)大きなファイルを、それぞれのファイルの中で10Kと言います。

2)100Kレコードがある場合、私は10個のファイル10K reocrds

3を含む各を取得します)私はこれらの10個のファイルを分割したいと5つのスレッドを使用して処理したいと思います。私はカスタムを使用することを考えていますMultiResourcePartioner

4)5つのスレッドは、分割プロセスで作成されたすべての10個のファイルを処理する必要があります。

5)メモリの問題に直面する可能性がある場合と同じ数のスレッドをファイル数と同じに作成したくありません。私が探しているのは、5つのスレッドだけを使用して処理したいファイルの数が何であれ(私の要求に応じて増やすことができます)です。

これは春のバッチを使用して達成できますか?はいあなたはポインタまたは参照実装を共有してください可能性がある場合、事前

おかげで作業ジョブ設定XML

<description>Spring Batch File Chunk Processing</description> 

<import resource="../config/batch-context.xml" /> 

<batch:job id="file-partition-batch" job-repository="jobRepository" restartable="false">   
    <batch:step id="master"> 
     <batch:partition partitioner="partitioner" handler="partitionHandler" /> 
    </batch:step> 
</batch:job> 

<batch:step id="slave"> 
    <batch:tasklet> 
     <batch:chunk reader="reader" processor="compositeProcessor" 
      writer="compositeWriter" commit-interval="5"> 
     </batch:chunk> 
    </batch:tasklet> 
</batch:step> 

<bean id="partitionHandler" class="org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler"> 
    <property name="taskExecutor" ref="taskExecutor"/> 
    <property name="step" ref="slave" /> 
    <property name="gridSize" value="5" /> 
</bean> 

<bean id="partitioner" class="com.poc.partitioner.FileMultiResourcePartitioner"> 
    <property name="resources" value="file:/Users/anupghosh/Documents/Spring_Batch/FilePartitionBatch/*.txt" /> 
    <property name="threadName" value="feed-processor" /> 
</bean> 

<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> 
    <property name="corePoolSize" value="5" /> 
    <property name="maxPoolSize" value="5" /> 
</bean> 

<bean id="reader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step"> 
    <property name="resource" value="#{stepExecutionContext['fileName']}" /> 

    <property name="lineMapper"> 
     <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper"> 
     <property name="lineTokenizer"> 
      <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"> 
       <property name="delimiter" value="|"/> 
       <property name="names" value="key,docName,docTypCD,itemType,itemNum,launchDate,status" /> 
      </bean> 
     </property> 
     <property name="fieldSetMapper"> 
      <bean class="com.poc.mapper.FileRowMapper" /> 
     </property> 
     </bean> 
    </property> 
</bean> 

<bean id="validatingProcessor" class="org.springframework.batch.item.validator.ValidatingItemProcessor"> 
    <constructor-arg ref="feedRowValidator" /> 
</bean> 

<bean id="feedProcesor" class="com.poc.processor.FeedProcessor" /> 

<bean id="compositeProcessor" class="org.springframework.batch.item.support.CompositeItemProcessor" scope="step"> 
    <property name="delegates"> 
     <list> 
      <ref bean="validatingProcessor" /> 
      <ref bean="feedProcesor" /> 
     </list> 
    </property> 
</bean> 

<bean id="recordDecWriter" class="com.poc.writer.RecordDecWriter" /> 

<bean id="reconFlatFileCustomWriter" class="com.poc.writer.ReconFileWriter"> 
    <property name="reconFlatFileWriter" ref="reconFlatFileWriter" /> 
</bean> 

<bean id="reconFlatFileWriter" class="org.springframework.batch.item.file.FlatFileItemWriter" scope="step"> 
    <property name="resource" value="file:/Users/anupghosh/Documents/Spring_Batch/recon-#{stepExecutionContext[threadName]}.txt" /> 
    <property name="shouldDeleteIfExists" value="true" /> 
    <property name="lineAggregator"> 
     <bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator"> 
      <property name="delimiter" value="|" /> 
      <property name="fieldExtractor"> 
       <bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor"> 
        <property name="names" value="validationError" /> 
       </bean> 
      </property> 
     </bean> 
    </property> 
</bean> 

<bean id="compositeWriter" class="org.springframework.batch.item.support.CompositeItemWriter"> 
    <property name="delegates"> 
     <list> 
      <ref bean="recordDecWriter" /> 
      <ref bean="reconFlatFileCustomWriter" /> 
     </list> 
    </property> 
</bean> 

<bean id="feedRowValidator" class="org.springframework.batch.item.validator.SpringValidator"> 
    <property name="validator"> 
     <bean class="com.poc.validator.FeedRowValidator"/> 
    </property> 
</bean>  

+0

あなたは本当にプロセスが正しくレイアウトされています。あなたの具体的な質問は何ですか? –

+0

短時間で応答するための@MichaelMinellaに感謝します。私の質問は:もし私が5つのスレッドを使ってそれらのファイルをどのように処理するのでしょうか?これら5つのスレッドがこれらの10個のファイルすべてを処理するように、これらのファイルをどのように分割するかを具体的に説明します。 –

+0

'MultiResourcePartitioner'は、パーティションを作成する正しい方法です。そこから、 'TaskExecutionPartitionHandler'を使用して、あなたの' TaskExecutor'の設定によって使用されるスレッドの数を制御します。デフォルトでは 'SyncTaskExecutor'を使いますが、' ThreadPoolTask​​Executor'のようなものを設定することを期待しています。その 'TaskExecutor'では、最大スレッドなどを設定することができます。 –

答えて

2

は、この使用してMultiResourcePartitionerを解決することができました。以下はJavaの設定です

@Bean 
    public Partitioner partitioner() { 
     MultiResourcePartitioner partitioner = new MultiResourcePartitioner(); 
     ClassLoader cl = this.getClass().getClassLoader(); 
     ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(cl); 
     Resource[] resources = resolver.getResources("file:" + filePath + "/"+"*.csv");  
     partitioner.setResources(resources); 
     partitioner.partition(10);  
     return partitioner; 
    } 

    @Bean 
    public TaskExecutor taskExecutor() { 
     ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); 
     taskExecutor.setMaxPoolSize(4); 
     taskExecutor.afterPropertiesSet(); 
     return taskExecutor; 
    } 

    @Bean 
    @Qualifier("masterStep") 
    public Step masterStep() { 
     return stepBuilderFactory.get("masterStep") 
       .partitioner(ProcessDataStep()) 
       .partitioner("ProcessDataStep",partitioner()) 
       .taskExecutor(taskExecutor()) 
       .listener(pcStressStepListener) 
       .build(); 
    } 


    @Bean 
    @Qualifier("processData") 
    public Step processData() { 
     return stepBuilderFactory.get("processData") 
       .<pojo, pojo> chunk(5000) 
       .reader(reader)    
       .processor(processor()) 
       .writer(writer)   
       .build(); 
    } 



    @Bean(name="reader") 
    @StepScope 
    public FlatFileItemReader<pojo> reader(@Value("#{stepExecutionContext['fileName']}") String filename) { 

     FlatFileItemReader<pojo> reader = new FlatFileItemReader<>(); 
     reader.setResource(new UrlResource(filename)); 
     reader.setLineMapper(new DefaultLineMapper<pojo>() { 
      { 
       setLineTokenizer(new DelimitedLineTokenizer() { 
        { 
         setNames(FILE HEADER); 


        } 
       }); 
       setFieldSetMapper(new BeanWrapperFieldSetMapper<pojo>() { 
        { 
         setTargetType(pojo.class); 
        } 
       }); 
      } 
     }); 
     return reader; 
    } 
+1

また、マルチスレッドの作業を行うためにtaskExecutor.setCorePoolSize(4)とtaskExecutor.setQueueCapacity(8)を追加する必要があります。 –

関連する問題