2016-05-23 5 views
3

は、私は私のソーステーブルに10件のレコードを持っていないと私は3JSR 352 - ItemReaderは、DBクエリ

としてアイテム数を持っています、私は(これらの10件のレコードを処理するために2つのパーティションを持っている時にチェックポイントを実装する方法最初の5つのレコードは最初のパーティションで処理され、残りのレコードは2番目のパーティションで処理されている間に2番目のパーティションで処理されます。2番目のパーティションの2番目のチャンクでジョブが失敗するように例外がスローされます。 ジョブを再起動すると、最後に失敗したチャンクレコードだけで処理する必要がありますが、そのパーティション内のすべてのレコードを処理する必要はありません。これを実現する方法を教えてください。

私のJSLは、以下のようなものです:

public class DBItemReader implements ItemReader { 
    @Inject 
    @BatchProperty 
    private String dsJNDI; 

    @Inject 
    @BatchProperty 
    private String whereclauseFrom; 


    @Inject 
    @BatchProperty 
    private String tableName; 

    private Connection conn =null; 
    private int totalRecords=0; 

    private DataSource ds = null; 
    List<RecObj> listRecObj=new ArrayList<RecObj>();  

    @Override 
    public Object readItem() throws SQLException { 
     if (listRecObj.size() == 0) {    
      return null; 
     } else { 
      RecObj rec =null;   
      Iterator<RecObj> iter =listRecObj.iterator(); 
      while (iter.hasNext()) {    
       rec = iter.next();    

       if (Integer.parseInt(rec.getRec()) == 7) {      
        throw new IllegalStateException("Thrown Error"); 
       } 
       iter.remove(); 
       return rec; 
      } 
      return rec; 
     } 


    @Override 
    public void open(Serializable arg0) throws NamingException, SQLException { 
      ds = DataSource.class.cast(new InitialContext().lookup(dsJNDI)); 
//  System.out.println("whereclauseFrom: " + whereclauseFrom);   
      conn = ds.getConnection(); 
      String sql =""; 
      if(Integer.parseInt(whereclauseFrom) == 5){ 
       sql = "SELECT * FROM " + tableName + " WHERE CAST(REC AS INTEGER) <= "+ whereclauseFrom; 
      }else if(Integer.parseInt(whereclauseFrom) == 6){ 
       sql = "SELECT * FROM " + tableName + " WHERE CAST(REC AS INTEGER) >= "+ whereclauseFrom; 
      } 

      PreparedStatement ps = conn.prepareStatement(sql); 
      ResultSet rs=ps.executeQuery(); 
      while(rs.next()){ 
      totalRecords++; 
      String rec=rs.getString("REC"); 
      if(rec != null) 
       listRecObj.add(new RecObj(rec)); 

      }   
      rs.close();   
    } 
    @Override 
    public void close() throws SQLException { 
     conn.close();  
    } 
    @Override 
    public Serializable checkpointInfo() {  
      return null; 
    } 

} 
    } 

MYライターのクラスは以下のようなものです::

<?xml version="1.0" encoding="UTF-8"?> 
<job xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://xmlns.jcp.org/xml/ns/javaee" 
    xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd" 
    id="readingfrom-db" restartable="true" version="1.0" > 
    <properties > 
     <property name="numRec" value="#{jobParameters['numRec']}?:5;"/>   
     <property name="chunkSize" value="#{jobParameters['chunkSize']}?:3;"/> 
     <property name="whereclauseFrom" value="#{jobParameters['whereclauseFrom']}?:5;"/> 
     <property name="whereclauseTo" value="#{jobParameters['whereclauseTo']}?:6;"/>  
     <property name="dsJNDI" value="#{jobParameters['dsJNDI']}?:jdbc/db2;"/> 
     <property name="dsJNDI1" value="#{jobParameters['dsJNDI1']}?:jdbc/db2;"/> 
     <property name="tableName" value="#{jobParameters['tableName']}?:CISDW.AIF1_CH;"/> 
     <property name="ProcesstableName" value="#{jobParameters['ProcesstableName']}?:CISDW.PROC_AIF1_CH;"/> 
    </properties> 
    <step id="runcache" next="readFromDB"> 
     <batchlet ref="com.cdc.runcache.CacheRunnerBatchlet" /> 
    </step> 
    <step id="readFromDB"> 
     <listeners> 
      <listener ref="com.cdc.dbreader.LogExceptionListener"/> 
     </listeners> 
     <chunk item-count="3" checkpoint-policy="item"> 
      <reader ref="com.cdc.dbreader.DBItemReader"> 
       <properties > 
        <property name="dsJNDI" value="#{jobProperties['dsJNDI']}"/> 
        <property name="tableName" value="#{jobProperties['tableName']}"/> 
        <property name="whereclauseFrom" value="#{partitionPlan['modrec']}"/>     
       </properties> 
      </reader> 
      <processor ref="com.cdc.dbreader.DBItemProcessor" />    
      <writer ref="com.cdc.dbreader.DBItemWriter"> 
       <properties > 
        <property name="dsJNDI" value="#{jobProperties['dsJNDI1']}"/> 
        <property name="tableName" value="#{jobProperties['ProcesstableName']}"/> 
       </properties> 
      </writer> 
     </chunk> 
     <partition> 
      <plan partitions="2" threads="2"> 
       <properties partition="0"> 
        <property name="modrec" value="#{jobProperties['whereclauseFrom']}"/>     
       </properties> 
       <properties partition="1"> 
        <property name="modrec" value="#{jobProperties['whereclauseTo']}"/> 
       </properties> 
      </plan> 
     </partition>   
    </step>      
</job> 

マイアイテムのリーダーは、以下のようなものです以下

public class DBItemWriter extends AbstractItemWriter implements ItemWriter {  
    @Inject 
    @BatchProperty 
    private String dsJNDI; 

    @Inject 
    @BatchProperty 
    private String tableName; 

    private DataSource ds = null; 

    @Override 
    public void open(Serializable arg0) throws NamingException { 
     ds = DataSource.class.cast(new InitialContext().lookup(dsJNDI));    
    } 

    @Override 
    public void writeItems(List<java.lang.Object> items) throws BatchUpdateException,SQLException{  
     Connection conn = ds.getConnection();   
     String sql = "INSERT INTO "+tableName+ "(MOD_REC) VALUES(?) ";  
     PreparedStatement ps = conn.prepareStatement(sql);   
     for (Object obj : items) {   
      RecObj v = (RecObj)obj; 
      System.out.println("=======Writer values===="+v.getRec());     
      ps.setString(1, v.getRec());    
      ps.addBatch(); 
     }   
     ps.executeBatch(); 
     ps.clearBatch(); 
     ps.close(); 
     conn.close(); 
    } 
} 

は私のプロセッサである。

public class DBItemProcessor implements ItemProcessor { 
    Integer count=0; 
    @Override 
    public Object processItem(Object arg0) { 
     count++; 
     RecObj v=(RecObj)arg0; 
     String vname=v.getRec(); 
     System.out.println("=========Processer Values==="+vname); 
     return new RecObj(vname+count); 
    } 
} 
01以下

は私のBeanクラスは、あなたがあなたの読者のオープン()の再起動時にメソッドに渡されます、あなたの読者のcheckpointInfo()でのチェックポイント値を返す必要が

public class RecObj { 
    private String rec; 


    public RecObj(String rec) { 
    this.rec=rec; 
} 
+0

を実装していますが、基本的な理解が正しいを持っているようですね、なぜそうしませんジョブを定義するために使用されるJSL(XML)とともにいくつかのコードスニペットを共有します。特に、チェックポイント値に基づいて位置決めを実装するために使用するコードだけでなく、各パーティションの読者(** @ BatchProperty **値を注入する)をどのようにパラメータ化するのかを理解するのに役立ちます。 –

+0

私は自分の疑問のすべてのアーティファクトを追加しました。私のコードを確認してください。私の問題を解決する方法を教えてください。 –

+0

まだ間違ったことはありません。チェックポイントの値を使用する読者の** open()**メソッドを追加してDBクエリを構築するだけでなく、2番目のチャンクにexcを投げたコードであれば、もう一度見ていきます。 –

答えて

1

です。これは、再起動時にチェックポイントを提供するために、リーダーとバッチコンテナがどのように連携するかを示します。

ですから(CHECKPOINTコメントを探してください)のようなものを持つことができます:

パブリッククラスDBItemReaderがItemReader {

// ... 

// CHECKPOINT field defined 
private String checkpoint = null; 

@Override 
public void open(Serializable checkpoint) throws NamingException, SQLException { 

    // CHECKPOINT-based positioning through query value. 
    // Initial position = whereclauseFrom, on restart set to checkpoint 
    String queryVal = (String)(checkpoint == null ? whereclauseFrom : checkpoint);  

     if(Integer.parseInt(whereclauseFrom) == 5){ 
      sql = "SELECT * FROM " + tableName + " WHERE CAST(REC AS INTEGER) <= "+ queryVal; 
     }else if(Integer.parseInt(whereclauseFrom) == 6){ 
      sql = "SELECT * FROM " + tableName + " WHERE CAST(REC AS INTEGER) >= "+ queryVal; 
     } 
// .. 
} 

@Override 
public Object readItem() throws SQLException { 
    if (listRecObj.size() == 0) {    
     return null; 
    } else { 
     RecObj rec =null;   
     Iterator<RecObj> iter =listRecObj.iterator(); 
     while (iter.hasNext()) {    
      rec = iter.next();    
      // CHECKPOINT updated 
      checkpoint = rec.getRec(); 
      if (Integer.parseInt(rec.getRec()) == 7) {      
       throw new IllegalStateException("Thrown Error"); 
      } 
    // ... 
@Override 
public Serializable checkpointInfo() {  
     // CHECKPOINT returned at end of chunk 
     return checkpoint; 
} 
+0

例にはもう1つの問題があります。提案された更新を行ったとしても、レコード#7で失敗した場合でも、2番目のパーティションを再起動しても、最初から開始します。これは、アイテム数が3で6で始まる場合、最初のチャンクはレコード6,7,8で構成されるためです。だから、実際には最初のチャンクではなく、2番目のチャンクで失敗しています。 –

+0

Scottさん、ありがとうございます。あなたの迅速なお世話に感謝します。 –

+0

if(整数。{0} == 5 && arg0 == null){ \t \t sql = "SELECT * FROM" + tableName + "WHERE CAST(REC AS INTEGER)<=" + whereclauseFrom; + + arg0 == null){ \t \t sql = "SELECT * FROM" + tableName + "WHERE CAST(REC AS INTEGER)> =" + whereclauseFrom; \t} \t} else { \t \t sql = "SELECT * FROM" + tableName + "WHERE CAST(REC AS INTEGER)>" + queryVal; \t} –

関連する問題