2012-04-30 11 views
2

私はHadoop 0.20.0/0.20.2のCombineFileInputFormatを使用して、1レコードにつき1ファイルを処理し、データローカリティ(通常は処理する)に妥協しないようにします。HadoopでCombineFileInputFormatを使用するには?

Tom WhiteのHadoop Definitive Guideに記載されていますが、彼はその方法を示していません。代わりに、彼はシーケンスファイルに移動します。

私は、レコードリーダーで処理された変数の意味をかなり混乱しています。 コード例は非常に役立ちます。

ありがとうございます。

+0

「レコードごとに1つのファイル」の意味を詳しく説明できますか? –

答えて

1

以下のファイル入力形式を組み合わせた入力形式を確認してください。

import java.io.IOException; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.InputSplit; 
import org.apache.hadoop.mapreduce.RecordReader; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader; 
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; 
import org.apache.hadoop.mapreduce.lib.input.FileSplit; 
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; 


/** 
* CustomInputformat which implements the createRecordReader of abstract class CombineFileInputFormat 
*/ 

public class MyCombineFileInputFormat extends CombineFileInputFormat { 

    public static class MyRecordReader extends RecordReader<LongWritable,Text>{ 
     private LineRecordReader delegate=null; 
     private int idx; 

     public MyRecordReader(CombineFileSplit split,TaskAttemptContext taskcontext ,Integer idx) throws IOException { 
      this.idx=idx; 
      delegate = new LineRecordReader(); 
     } 

     @Override 
     public void close() throws IOException { 
      delegate.close(); 
     } 

     @Override 
     public float getProgress() { 
      try { 
       return delegate.getProgress(); 
      } 
      catch(Exception e) { 
       return 0; 
      } 
     } 

     @Override 
     public void initialize(InputSplit split, TaskAttemptContext taskcontext) throws IOException { 
      CombineFileSplit csplit=(CombineFileSplit)split; 
      FileSplit fileSplit = new FileSplit(csplit.getPath(idx), csplit.getOffset(idx), csplit.getLength(idx), csplit.getLocations()); 
      delegate.initialize(fileSplit, taskcontext); 
     } 

     @Override 
     public LongWritable getCurrentKey() throws IOException, 
       InterruptedException { 
      return delegate.getCurrentKey(); 
     } 


     @Override 
     public Text getCurrentValue() throws IOException, InterruptedException { 
      return delegate.getCurrentValue(); 
     } 

     @Override 
     public boolean nextKeyValue() throws IOException, InterruptedException { 
      return delegate.nextKeyValue(); 
     } 

    } 

    @SuppressWarnings("unchecked") 
    @Override 
    public RecordReader createRecordReader(InputSplit split,TaskAttemptContext taskcontext) throws IOException { 
     return new CombineFileRecordReader((CombineFileSplit) split, taskcontext, MyRecordReader.class); 
    } 
} 
0

これは、いわゆる「新しいAPI」からのCombineFileInputFormatを使用する最も簡単な方法です。あなたの実際の入力形式はMyFormatであると仮定し、それが(例えば、SequenceFileInputFormat< MyKey, MyValue >のいくつかのサブクラスであるかもしれない)MYKEYの鍵とMyValueの値で動作します。あなたの仕事のドライバで

public class CombinedMyFormat extends CombineFileInputFormat< MyKey, MyValue > { 
    // exists merely to fix the key/value types and 
    // inject the delegate format to the superclass 
    // if MyFormat does not use state, consider a constant instead 
    private static class CombineMyKeyMyValueReaderWrapper 
    extends CombineFileRecordReaderWrapper< MyKey, MyValue > { 
     protected CombineMyKeyMyValueReaderWrapper(
      CombineFileSplit split, TaskAttemptContext ctx, Integer idx 
     ) throws IOException, InterruptedException { 
      super(new MyFormat(), split, ctx, idx); 
     } 
    } 

    @Override 
    public RecordReader< MyKey, MyValue > createRecordReader(
     InputSplit split, TaskAttemptContext ctx 
    ) throws IOException { 
     return new CombineFileRecordReader< MyKey, MyValue >(
      (CombineFileSplit)split, ctx, CombineMyKeyMyValueReaderWrapper.class 
     ); 
    } 
} 

、あなたは今だけのMyFormatためCombinedMyFormatにドロップすることができるはずです。 Hadoopが入力全体を1つの分割にまとめるのを防ぐには、max split size propertyも設定する必要があります。

関連する問題