2017-11-30 9 views
0

Spring Batchに問題があります。我々の仕事では、2つの並列ステップのフローを処理するタスク実行プログラム(simpleAsyncTaskExecutor)を使用しています。各ステップにおいてSpring Batch:ジョブは、コミット間隔よりも少ない行のチャンクをランダムに選択します。

、タスクエグゼキュータは、(マルチスレッドステップの概念を使用して:https://docs.spring.io/spring-batch/trunk/reference/html/scalability.htmlを参照)は、異なるスレッドにリーダーによって返されたデータの各チャンクを分割

問題は、我々のコミット間隔が大きいことです(24,000)、読者が返す行の数は非常に少なく(50行未満)、ライターは時々複数のチャンクを受け取ります(例えば、30行のチャンクと20行のチャンク、 25行のチャンク、25行のチャンク、または50行の1チャンクであることができます。ランダムであるように見えますが、任意の実行に対して50行のチャンクを1つだけ受け取る必要があります(ランダムではありません)。それはコミット間隔を超えません。

私はなぜこれがいくつかの実行でランダムに発生する理由を理解しようとしています。 誰でもSpring Batchでこの問題を知っていれば、助けてくれますか?

ありがとうございます。ここで

は(私たちのカスタム作家を除く)私の仕事の構成は次のとおりです。

<batch:job id="job"> 

      <batch:split id="split" task-executor="taskExecutor"> 
       <batch:flow> 
        <batch:step id="step1"> 
         <batch:tasklet task-executor="taskExecutor" throttle-limit="4" > 
           <batch:chunk reader="reader1" writer="writer1" commit-interval="24000" />   
         </batch:tasklet> 
        </batch:step> 
       </batch:flow> 
       <batch:flow> 

        <batch:step id="step2"> 
         <batch:tasklet task-executor="taskExecutor" throttle-limit="4" > 
           <batch:chunk reader="reader2" writer="writer2" commit-interval="24000" />   
         </batch:tasklet> 
        </batch:step> 
       </batch:flow>  
      </batch:split> 
     </batch:job> 

    <bean id="reader1" class="org.springframework.batch.item.database.JdbcPagingItemReader" scope="step">  
     <property name="dataSource" ref="postgresql_1" />       

     <property name="queryProvider">    
      <bean class="org.springframework.batch.item.database.support.PostgresPagingQueryProvider"> 
        <property name="selectClause" value=" 
         SELECT name 
        " /> 
        <property name="fromClause" value=" 
         FROM database.people 
        " /> 
        <property name="whereClause" value=" 
         WHERE age > 30 
        " /> 
        <property name="sortKeys"> 
         <map> 
          <entry key="people_id" value="ASCENDING"/> 
         </map> 
        </property>    
      </bean>   
     </property>   
     <property name="saveState" value="false" />   
     <property name="rowMapper">    
      <bean class="fr.myapp.PeopleRowMapper" />  
     </property>  
    </bean> 

     <bean id="reader2" class="org.springframework.batch.item.database.JdbcPagingItemReader" scope="step">  
     <property name="dataSource" ref="postgresql_1" />   
     <property name="queryProvider">   
       <bean class="org.springframework.batch.item.database.support.PostgresPagingQueryProvider"> 
        <property name="selectClause" value=" 
         SELECT product_name 
        " /> 
        <property name="fromClause" value=" 
         FROM database.products 
        " /> 
        <property name="whereClause" value=" 
         WHERE product_order_date <= '01/11/2017' 
        " /> 
        <property name="sortKeys"> 
         <map> 
          <entry key="product_id" value="ASCENDING"/> 
         </map> 
        </property>    
       </bean>  
      </property>  
     <property name="saveState" value="false" />  
     <property name="rowMapper"> 
       <bean class="fr.myapp.ProductsRowMapper" /> 
     </property> 
    </bean> 
    <bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor"> 
      <property name="concurrencyLimit" value="8" /> 
    </bean> 

答えて

0

書くために、レコードの保留数がコミット・インターバルに達するまで、作家のwrite()メソッドは(と呼ばれるべきではありませんまたはチャンクサイズ)。その前に呼び出されるべき唯一の時間は、read()が結果を残さないことを示すリーダーでnullを返すかどうかです。

小さいチャンクが表示されたら、それはジョブの途中ですか? RowMappers内にロジックがありますか、または読み込み結果をロールアップするために省略したものはありますか?

+0

こんにちはジョーはあなたの答えをありがとう。私のポストで言ったように、この動作は無作為であると思われます(読者が50の1チャンクを送信することがありますが、時には2つの小さなチャンクがあります)ので、どこかのロジックよりもバグのようです。 PersonRowMapperは行を正しいPersonBeanにのみマップします。私は、ライターがread()メソッドを実行した後にwriter()メソッドが呼び出されたときに、2つのスレッドで1つのチャンクまたは2つの小さなチャンクを受け取ることがわかりました。 – rBenks93

関連する問題